From fbfb4b15cc701502d507de160fb06042b96c07aa Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 14 Oct 2025 21:48:37 +0530 Subject: [PATCH 01/11] LDS/RDS to deps manager --- internal/grpctest/tlogger.go | 45 +- .../xds/xdsclient/xdsresource/xdsconfig.go | 95 +++ internal/xds/xdsdependencymanager/logging.go | 34 + .../xds/xdsdependencymanager/watch_service.go | 92 +++ .../xds_dependency_manager.go | 241 ++++++ .../xds_dependency_manager_test.go | 769 ++++++++++++++++++ 6 files changed, 1271 insertions(+), 5 deletions(-) create mode 100644 internal/xds/xdsclient/xdsresource/xdsconfig.go create mode 100644 internal/xds/xdsdependencymanager/logging.go create mode 100644 internal/xds/xdsdependencymanager/watch_service.go create mode 100644 internal/xds/xdsdependencymanager/xds_dependency_manager.go create mode 100644 internal/xds/xdsdependencymanager/xds_dependency_manager_test.go diff --git a/internal/grpctest/tlogger.go b/internal/grpctest/tlogger.go index 492381758762..0d083b048ab6 100644 --- a/internal/grpctest/tlogger.go +++ b/internal/grpctest/tlogger.go @@ -66,10 +66,11 @@ type tLogger struct { v int initialized bool - mu sync.Mutex // guards t, start, and errors - t *testing.T - start time.Time - errors map[*regexp.Regexp]int + mu sync.Mutex // guards t, start, and errors + t *testing.T + start time.Time + errors map[*regexp.Regexp]int + warnings map[*regexp.Regexp]int } func init() { @@ -87,7 +88,7 @@ func init() { } } // Initialize tLogr with the determined verbosity level. - tLogr = &tLogger{errors: make(map[*regexp.Regexp]int), v: vLevel} + tLogr = &tLogger{errors: make(map[*regexp.Regexp]int), warnings: make(map[*regexp.Regexp]int), v: vLevel} } // getCallingPrefix returns the at the given depth from the stack. @@ -120,6 +121,9 @@ func (tl *tLogger) log(ltype logType, depth int, format string, args ...any) { } else { tl.t.Error(args...) } + case warningLog: + tl.expectedWarning(fmt.Sprintln(args...)) + tl.t.Log(args...) case fatalLog: panic(fmt.Sprint(args...)) default: @@ -155,6 +159,7 @@ func (tl *tLogger) update(t *testing.T) { tl.t = t tl.start = time.Now() tl.errors = map[*regexp.Regexp]int{} + tl.warnings = map[*regexp.Regexp]int{} } // ExpectError declares an error to be expected. For the next test, the first @@ -178,6 +183,18 @@ func ExpectErrorN(expr string, n int) { tLogr.errors[re] += n } +// ExpectWarning declares a warning to be expected. +func ExpectWarning(expr string) { + tLogr.mu.Lock() + defer tLogr.mu.Unlock() + re, err := regexp.Compile(expr) + if err != nil { + tLogr.t.Error(err) + return + } + tLogr.warnings[re]++ +} + // endTest checks if expected errors were not encountered. func (tl *tLogger) endTest(t *testing.T) { tl.mu.Lock() @@ -188,6 +205,12 @@ func (tl *tLogger) endTest(t *testing.T) { } } tl.errors = map[*regexp.Regexp]int{} + for re, count := range tl.warnings { + if count > 0 { + t.Errorf("Expected warning '%v' not encountered", re.String()) + } + } + tl.warnings = map[*regexp.Regexp]int{} } // expected determines if the error string is protected or not. @@ -204,6 +227,18 @@ func (tl *tLogger) expected(s string) bool { return false } +// expectedWarning determines if the warning string is protected or not. +func (tl *tLogger) expectedWarning(s string) { + for re, count := range tl.warnings { + if re.FindStringIndex(s) != nil { + tl.warnings[re]-- + if count <= 1 { + delete(tl.warnings, re) + } + } + } +} + func (tl *tLogger) Info(args ...any) { tl.log(infoLog, 0, "", args...) } diff --git a/internal/xds/xdsclient/xdsresource/xdsconfig.go b/internal/xds/xdsclient/xdsresource/xdsconfig.go new file mode 100644 index 000000000000..8b8614edceb7 --- /dev/null +++ b/internal/xds/xdsclient/xdsresource/xdsconfig.go @@ -0,0 +1,95 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xdsresource + +import "google.golang.org/grpc/resolver" + +// XDSConfig holds the complete and resolved xDS resource configuration +// including LDS, RDS, CDS and endpoints. +type XDSConfig struct { + // Listener is the listener resource update + Listener ListenerUpdate + + // RouteConfig is the route configuration resource update. It will be + // populated even if RouteConfig is inlined into the Listener resource. + RouteConfig RouteConfigUpdate + + // VirtualHost is the virtual host from the route configuration matched with + // dataplane authority . + VirtualHost *VirtualHost + + // Clusters maps the cluster name with the ClusterResult which will have + // either the cluster configuration or error. It will have an error status + // if either + // + // (a) there was an error and we did not already have a valid resource or + // + // (b) the resource does not exist. + Clusters map[string]*ClusterResult +} + +// ClusterResult contains either a cluster's configuration or an error. +type ClusterResult struct { + Config ClusterConfig + Err error +} + +// ClusterConfig contains cluster configuration for a single cluster. +type ClusterConfig struct { + Cluster ClusterUpdate // Cluster configuration. Always present. + EndpointConfig EndpointConfig // Endpoint configuration for leaf clusters which will of type EDS or DNS. + AggregateConfig AggregateConfig // List of children for aggregate clusters. +} + +// AggregateConfig contains a list of leaf cluster names. +type AggregateConfig struct { + LeafClusters []string +} + +// EndpointConfig contains resolved endpoints for a leaf cluster either from DNS +// or EDS watchers and error. +type EndpointConfig struct { + EDSUpdate EndpointsUpdate // Resolved endpoints for EDS clusters. + DNSEndpoints DNSUpdate // Resolved endpoints for LOGICAL_DNS clusters. + ResolutionNote error // Error obtaining endpoints data +} + +// DNSUpdate contains the update from DNS resolver. +type DNSUpdate struct { + Endpoints []resolver.Endpoint +} + +// xdsConfigkey is the type used as the key to store XDSConfig in +// the Attributes field of resolver.states. +type xdsConfigkey struct{} + +// SetXDSConfig returns a copy of state in which the Attributes field +// is updated with the XDSConfig. +func SetXDSConfig(state resolver.State, config *XDSConfig) resolver.State { + state.Attributes = state.Attributes.WithValue(xdsConfigkey{}, config) + return state +} + +// XDSConfigFromResolverState returns XDSConfig stored in attribute in resolver state. +func XDSConfigFromResolverState(state resolver.State) *XDSConfig { + state.Attributes.Value(xdsConfigkey{}) + if v := state.Attributes.Value(xdsConfigkey{}); v != nil { + return v.(*XDSConfig) + } + return nil +} diff --git a/internal/xds/xdsdependencymanager/logging.go b/internal/xds/xdsdependencymanager/logging.go new file mode 100644 index 000000000000..8bce77dc29d2 --- /dev/null +++ b/internal/xds/xdsdependencymanager/logging.go @@ -0,0 +1,34 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsdepmgr + +import ( + "fmt" + + "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" +) + +const prefix = "[xds-dependency-manager %p] " + +var logger = grpclog.Component("xds") + +func prefixLogger(p *DependencyManager) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) +} diff --git a/internal/xds/xdsdependencymanager/watch_service.go b/internal/xds/xdsdependencymanager/watch_service.go new file mode 100644 index 000000000000..3b3c65ba29db --- /dev/null +++ b/internal/xds/xdsdependencymanager/watch_service.go @@ -0,0 +1,92 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsdepmgr + +import ( + "context" + + "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" +) + +type listenerWatcher struct { + resourceName string + cancel func() + parent *DependencyManager +} + +func newListenerWatcher(resourceName string, parent *DependencyManager) *listenerWatcher { + lw := &listenerWatcher{resourceName: resourceName, parent: parent} + lw.cancel = xdsresource.WatchListener(parent.xdsClient, resourceName, lw) + return lw +} + +func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) { + handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() } + l.parent.serializer.ScheduleOr(handleUpdate, onDone) +} + +func (l *listenerWatcher) ResourceError(err error, onDone func()) { + handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() } + l.parent.serializer.ScheduleOr(handleError, onDone) +} + +func (l *listenerWatcher) AmbientError(err error, onDone func()) { + handleError := func(context.Context) { l.parent.onListenerResourceAmbientError(err); onDone() } + l.parent.serializer.ScheduleOr(handleError, onDone) +} + +func (l *listenerWatcher) stop() { + l.cancel() + l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName) +} + +type routeConfigWatcher struct { + resourceName string + cancel func() + parent *DependencyManager +} + +func newRouteConfigWatcher(resourceName string, parent *DependencyManager) *routeConfigWatcher { + rw := &routeConfigWatcher{resourceName: resourceName, parent: parent} + rw.cancel = xdsresource.WatchRouteConfig(parent.xdsClient, resourceName, rw) + return rw +} + +func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigResourceData, onDone func()) { + handleUpdate := func(context.Context) { + r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource) + onDone() + } + r.parent.serializer.ScheduleOr(handleUpdate, onDone) +} + +func (r *routeConfigWatcher) ResourceError(err error, onDone func()) { + handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() } + r.parent.serializer.ScheduleOr(handleError, onDone) +} + +func (r *routeConfigWatcher) AmbientError(err error, onDone func()) { + handleError := func(context.Context) { r.parent.onRouteConfigResourceAmbientError(r.resourceName, err); onDone() } + r.parent.serializer.ScheduleOr(handleError, onDone) +} + +func (r *routeConfigWatcher) stop() { + r.cancel() + r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName) +} diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager.go b/internal/xds/xdsdependencymanager/xds_dependency_manager.go new file mode 100644 index 000000000000..3234c3e9959f --- /dev/null +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager.go @@ -0,0 +1,241 @@ +/* + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package xdsdepmgr contains functions, structs, and utilities for working with +// xds dependency manager. It will handle all the xds resources like listener, +// route, cluster and endpoint resources and their dependencies. +package xdsdepmgr + +import ( + "context" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/internal/status" + "google.golang.org/grpc/internal/xds/xdsclient" + "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" +) + +// DependencyManager struct stores all the information needed for watching all +// the xds resources. +type DependencyManager struct { + xdsClient xdsclient.XDSClient + + watcher ConfigWatcher + // All methods on the xdsResolver type except for the ones invoked by gRPC, + // i.e ResolveNow() and Close(), are guaranteed to execute in the context of + // this serializer's callback. And since the serializer guarantees mutual + // exclusion among these callbacks, we can get by without any mutexes to + // access all of the below defined state. The only exception is Close(), + // which does access some of this shared state, but it does so after + // cancelling the context passed to the serializer. + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc + + // dataplaneAuthority is the authority used for the data plane connections, + // which is also used to select the VirtualHost within the xDS + // RouteConfiguration. This is %-encoded to match with VirtualHost Domain + // in xDS RouteConfiguration. + dataplaneAuthority string + + ldsResourceName string + listenerWatcher *listenerWatcher + currentListenerUpdate xdsresource.ListenerUpdate + + rdsResourceName string + currentRouteConfig xdsresource.RouteConfigUpdate + routeConfigWatcher *routeConfigWatcher + currentVirtualHost *xdsresource.VirtualHost + + logger *grpclog.PrefixLogger +} + +// New creates a new dependency manager that manages all the xds resources +// including LDS, RDS, CDS and EDS and sends update once we have all the +// resources and sends an error when we get error in listener or route +// resources. +func New(listenername, dataplaneAuthority string, xdsClient xdsclient.XDSClient, watcher ConfigWatcher) *DependencyManager { + // Builds the dependency manager and starts the listener watch. + dm := &DependencyManager{ + ldsResourceName: listenername, + dataplaneAuthority: dataplaneAuthority, + xdsClient: xdsClient, + watcher: watcher, + } + dm.logger = prefixLogger(dm) + + // Initialize the serializer used to synchronize the updates from the xDS + // client. + ctx, cancel := context.WithCancel(context.Background()) + dm.serializer = grpcsync.NewCallbackSerializer(ctx) + dm.serializerCancel = cancel + + // Start the listener watch. Listener watch will start the other resource + // watches as needed. + dm.listenerWatcher = newListenerWatcher(listenername, dm) + return dm +} + +// Close closes the dependency manager and stops all the watchers registered. +func (m *DependencyManager) Close() { + // Cancel the context passed to the serializer and wait for any scheduled + // callbacks to complete. Canceling the context ensures that no new + // callbacks will be scheduled. + m.serializerCancel() + <-m.serializer.Done() + + if m.listenerWatcher != nil { + m.listenerWatcher.stop() + } + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + } +} + +// ConfigWatcher is notified of the XDSConfig resource updates and errors that +// are received by the xDS client from the management server. It only receives a +// XDSConfig update after all the xds resources have been received. +type ConfigWatcher interface { + // OnUpdate is invoked by the dependency manager to provide a new, + // validated xDS configuration to the watcher. + OnUpdate(xdsresource.XDSConfig) + + // OnError is invoked when an error is received in listener or route + // resource. This includes cases where: + // - The listener or route resource watcher reports a resource error. + // - The received listener resource is a socket listener, not an API listener - TODO : This is not yet implemented, tracked here #8114 + // - The received route configuration does not contain a virtual host + // matching the channel's default authority. + OnError(error) +} + +func (m *DependencyManager) maybeSendUpdate() { + if m.logger.V(2) { + m.logger.Infof("Sending update to watcher: Listener: %v, RouteConfig: %v", pretty.ToJSON(m.currentListenerUpdate), pretty.ToJSON(m.currentRouteConfig)) + } + m.watcher.OnUpdate(xdsresource.XDSConfig{ + Listener: m.currentListenerUpdate, + RouteConfig: m.currentRouteConfig, + VirtualHost: m.currentVirtualHost, + }) +} + +func (m *DependencyManager) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) { + matchVh := xdsresource.FindBestMatchingVirtualHost(m.dataplaneAuthority, update.VirtualHosts) + if matchVh == nil { + m.watcher.OnError(status.Errorf(codes.Unavailable, "Could not find VirtualHost for %q", m.dataplaneAuthority)) + return + } + m.currentRouteConfig = update + m.currentVirtualHost = matchVh + m.maybeSendUpdate() +} + +// Only executed in the context of a serializer callback. +func (m *DependencyManager) onListenerResourceUpdate(update xdsresource.ListenerUpdate) { + if m.logger.V(2) { + m.logger.Infof("Received update for Listener resource %q: %v", m.ldsResourceName, pretty.ToJSON(update)) + } + + m.currentListenerUpdate = update + + if update.InlineRouteConfig != nil { + // If there was a previous route config watcher because of a non-inline + // route configuration, cancel it. + m.rdsResourceName = "" + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + m.routeConfigWatcher = nil + } + + m.applyRouteConfigUpdate(*update.InlineRouteConfig) + return + } + + // We get here only if there was no inline route configuration. + + // If the route config name has not changed, send an update with existing + // route configuration and the newly received listener configuration. + if m.rdsResourceName == update.RouteConfigName { + m.maybeSendUpdate() + return + } + + // If the route config name has changed, cancel the old watcher and start a + // new one. At this point, since we have not yet resolved the new route + // config name, we don't send an update to the channel, and therefore + // continue using the old route configuration (if received) until the new + // one is received. + m.rdsResourceName = update.RouteConfigName + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + m.currentVirtualHost = nil + } + m.routeConfigWatcher = newRouteConfigWatcher(m.rdsResourceName, m) +} + +// Only executed in the context of a serializer callback. +func (m *DependencyManager) onListenerResourceError(err error) { + if m.logger.V(2) { + m.logger.Infof("Received resource error for Listener resource %q: %v", m.ldsResourceName, err) + } + + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + } + m.rdsResourceName = "" + m.currentVirtualHost = nil + m.routeConfigWatcher = nil + m.watcher.OnError(status.Errorf(codes.Unavailable, "Listener resource error : %v", err)) +} + +// Only executed in the context of a serializer callback. +func (m *DependencyManager) onListenerResourceAmbientError(err error) { + m.logger.Warningf("Listener resource ambient error: %v", err) +} + +// Only executed in the context of a serializer callback. +func (m *DependencyManager) onRouteConfigResourceUpdate(resourceName string, update xdsresource.RouteConfigUpdate) { + if m.logger.V(2) { + m.logger.Infof("Received update for RouteConfiguration resource %q: %v", resourceName, pretty.ToJSON(update)) + } + + if m.rdsResourceName != resourceName { + // Drop updates from canceled watchers. + return + } + m.applyRouteConfigUpdate(update) +} + +// Only executed in the context of a serializer callback. +func (m *DependencyManager) onRouteConfigResourceError(resourceName string, err error) { + if m.logger.V(2) { + m.logger.Infof("Received resource error for RouteConfiguration resource %q: %v", resourceName, err) + } + + //If update is not for the current watcher + if m.rdsResourceName != resourceName { + return + } + m.watcher.OnError(status.Errorf(codes.Unavailable, "Route resource error : %v", err)) +} + +// Only executed in the context of a serializer callback. +func (m *DependencyManager) onRouteConfigResourceAmbientError(resourceName string, err error) { + m.logger.Warningf("Route resource ambient error %q: %v", resourceName, err) +} diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go new file mode 100644 index 000000000000..04e22466a25d --- /dev/null +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go @@ -0,0 +1,769 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsdepmgr + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/internal/xds/xdsclient" + "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" + "google.golang.org/grpc/status" + + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" + v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + + _ "google.golang.org/grpc/internal/xds/httpfilter/router" // Register the router filter +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +const ( + defaultTestTimeout = 10 * time.Second + defaultTestShortTimeout = 100 * time.Microsecond + + defaultTestServiceName = "service-name" + defaultTestRouteConfigName = "route-config-name" + defaultTestClusterName = "cluster-name" +) + +var wantXdsConfig = xdsresource.XDSConfig{ + Listener: xdsresource.ListenerUpdate{ + RouteConfigName: defaultTestRouteConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}}, + RouteConfig: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{Prefix: newStringP("/"), + WeightedClusters: map[string]xdsresource.WeightedCluster{defaultTestClusterName: {Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}}, + }, + }, + }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{Prefix: newStringP("/"), + WeightedClusters: map[string]xdsresource.WeightedCluster{defaultTestClusterName: {Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}}, + }, +} + +func newStringP(s string) *string { + return &s +} + +// testWatcher is a mock implementation of the ConfigWatcher interface +// that allows defining custom logic for its methods in each test. +type testWatcher struct { + onUpdate func(xdsresource.XDSConfig) + onError func(error) +} + +// OnUpdate calls the underlying onUpdate function if it's not nil. +func (w *testWatcher) OnUpdate(cfg xdsresource.XDSConfig) { + if w.onUpdate != nil { + w.onUpdate(cfg) + } +} + +// OnError calls the underlying onError function if it's not nil. +func (w *testWatcher) OnError(err error) { + if w.onError != nil { + w.onError(err) + } +} + +func verifyError(gotErr error, wantCode codes.Code, wantErr, wantNodeID string) error { + if gotErr == nil { + return fmt.Errorf("got nil error from resolver, want error with code %v", wantCode) + } + if !strings.Contains(gotErr.Error(), wantErr) { + return fmt.Errorf("got error from resolver %q, want %q", gotErr, wantErr) + } + if gotCode := status.Code(gotErr); gotCode != wantCode { + return fmt.Errorf("got error from resolver with code %v, want %v", gotCode, wantCode) + } + if !strings.Contains(gotErr.Error(), wantNodeID) { + return fmt.Errorf("got error from resolver %q, want nodeID %q", gotErr, wantNodeID) + } + return nil +} + +// newDependencyManagerForTest creates a new DependencyManager for testing purposes. +func newDependencyManagerForTest(t *testing.T, listenerName string, target string, bootstrapContents []byte, watcher ConfigWatcher) *DependencyManager { + t.Helper() + + // Setup the bootstrap file contents. + config, err := bootstrap.NewConfigFromContents(bootstrapContents) + if err != nil { + t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err) + } + + pool := xdsclient.NewPool(config) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + c, cancel, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + WatchExpiryTimeout: defaultTestTimeout, + }) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) + } + t.Cleanup(cancel) + + return New(listenerName, target, c, watcher) +} + +// Spins up an xDS management server and sets up an xDS bootstrap configuration +// file that points to it. +// +// Returns the following: +// - A reference to the xDS management server +// - Contents of the bootstrap configuration pointing to xDS management +// server +func setupManagementServerForTest(t *testing.T, nodeID string) (*e2e.ManagementServer, []byte) { + t.Helper() + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + return mgmtServer, bootstrapContents + +} + +// Tests the happy case where the dependency manager receives all the required +// resources and calls OnUpdate with the correct XDSConfig. +func (s) TestHappyCase(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + defer mgmtServer.Stop() + + updateCh := make(chan xdsresource.XDSConfig, 1) + watcher := &testWatcher{ + onUpdate: func(cfg xdsresource.XDSConfig) { + updateCh <- cfg + }, + onError: func(err error) { + t.Errorf("Received unexpected error from dependency manager: %v", err) + }, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) + defer dm.Close() + + cmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), + cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), + cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), + } + + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for update from dependency manager") + case update := <-updateCh: + if diff := cmp.Diff(update, wantXdsConfig, cmpOpts...); diff != "" { + t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) + + } + } +} + +// Tests the case where the listener contains an inline route configuration and +// verifies that OnUpdate is called with the correct XDSConfig. +func (s) TestInlineRouteConfig(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + defer mgmtServer.Stop() + + updateCh := make(chan xdsresource.XDSConfig, 1) + watcher := &testWatcher{ + onUpdate: func(cfg xdsresource.XDSConfig) { + updateCh <- cfg + }, + onError: func(err error) { + t.Errorf("Received unexpected error from dependency manager: %v", err) + }, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName), + }, + HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, // router fields are unused by grpc + }) + listener := &v3listenerpb.Listener{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, + }}, + }}, + } + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) + defer dm.Close() + + wantConfig := wantXdsConfig + wantConfig.Listener.RouteConfigName = "" + wantConfig.Listener.InlineRouteConfig = &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{Prefix: newStringP("/"), + WeightedClusters: map[string]xdsresource.WeightedCluster{defaultTestClusterName: {Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}}, + }, + }, + } + cmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), + cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), + cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), + } + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for update from dependency manager") + case update := <-updateCh: + if diff := cmp.Diff(update, wantConfig, cmpOpts...); diff != "" { + t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) + } + } +} + +// Tests the case where dependency manager only receives listener resource but +// does not receive route config resource. Verfies that OnUpdate is not called +// since we do not have all resources. +func (s) TestIncompleteResources(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + defer mgmtServer.Stop() + + updateCh := make(chan xdsresource.XDSConfig, 1) + watcher := &testWatcher{ + onUpdate: func(cfg xdsresource.XDSConfig) { + updateCh <- cfg + }, + onError: func(err error) { + t.Errorf("Received unexpected error from dependency manager: %v", err) + }, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + SkipValidation: true, + }); err != nil { + t.Fatal(err) + } + dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) + defer dm.Close() + + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-sCtx.Done(): + case update := <-updateCh: + t.Fatalf("Received unexpected update from dependency manager: %v", pretty.ToJSON(update)) + } +} + +// Tests the case where dependency manager receives a listener resource error by +// sending the correct update first and then removing the listener resource. It +// verifies that OnError is called with the correct error. +func (s) TestListenerResourceError(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + defer mgmtServer.Stop() + + errorCh := make(chan error, 1) + updateCh := make(chan xdsresource.XDSConfig, 1) + watcher := &testWatcher{ + onUpdate: func(cfg xdsresource.XDSConfig) { + updateCh <- cfg + }, + onError: func(err error) { + errorCh <- err + }, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Send a correct update first + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) + defer dm.Close() + + cmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), + cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), + cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), + } + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for update from dependency manager") + case update := <-updateCh: + if diff := cmp.Diff(wantXdsConfig, update, cmpOpts...); diff != "" { + t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) + } + } + + // Remove listener resource so that we get listener resource error. + resources.Listeners = nil + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + select { + case err := <-errorCh: + if err := verifyError(err, codes.Unavailable, fmt.Sprintf("xds: resource %q of type %q has been removed", defaultTestServiceName, "ListenerResource"), nodeID); err != nil { + t.Fatal(err) + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for error from dependency manager") + } +} + +// Tests the case where dependency manager receives a route config resource +// error by sending a route resource that is NACKed by the XDSClient. It +// verifies that OnError is called with correct error. +func (s) TestRouteResourceError(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + defer mgmtServer.Stop() + + errorCh := make(chan error, 1) + watcher := &testWatcher{ + onUpdate: func(xdsresource.XDSConfig) { + t.Errorf("Received unexpected update from dependency manager") + }, + onError: func(err error) { + errorCh <- err + }, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + // Remove the Match to make sure the route resource is NACKed by XDSClient + // sending a route resource error to dependency manager. + route.VirtualHosts[0].Routes[0].Match = nil + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) + defer dm.Close() + + select { + case err := <-errorCh: + if err := verifyError(err, codes.Unavailable, "Route resource error", nodeID); err != nil { + t.Fatal(err) + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for error from dependency manager") + } +} + +// Tests the case where route config updates receives does not have any virtual +// host. Verifies that OnError is called with correct error. +func (s) TestNoVirtualHost(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + defer mgmtServer.Stop() + + errorCh := make(chan error, 1) + watcher := &testWatcher{ + onUpdate: func(xdsresource.XDSConfig) { + t.Errorf("Received unexpected update from dependency manager") + }, + onError: func(err error) { + errorCh <- err + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + route.VirtualHosts = nil + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) + defer dm.Close() + + select { + case err := <-errorCh: + if err := verifyError(err, codes.Unavailable, "Could not find VirtualHost", ""); err != nil { + t.Fatal(err) + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for error from dependency manager") + } +} + +// Tests the case where we get a listener resource ambient error and verify that +// we correctly log the warning for it. To make sure we get a listener ambient +// error, we send a correct update first , then send an invalid one and then +// send the valid resource again. We send the valid resource again so that we +// can be sure the abmient error reaches the dependency manager since there is +// no other way to wait for it . +func (s) TestListenerResourceAmbientError(t *testing.T) { + grpctest.ExpectWarning("Listener resource ambient error") + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + defer mgmtServer.Stop() + + updateCh := make(chan xdsresource.XDSConfig, 1) + errorCh := make(chan error, 1) + watcher := &testWatcher{ + onUpdate: func(cfg xdsresource.XDSConfig) { + updateCh <- cfg + }, + onError: func(err error) { + errorCh <- err + }, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Configure a valid listener and route. + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) + defer dm.Close() + + cmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), + cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), + cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), + } + // Wait for the initial valid update. + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for initial update from dependency manager") + case update := <-updateCh: + if diff := cmp.Diff(wantXdsConfig, update, cmpOpts...); diff != "" { + t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) + } + } + + // Now, send an invalid listener resource. Since a valid one is already + // cached, this should result in an ambient error. + hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, + }) + lis := &v3listenerpb.Listener{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, + }}, + }}, + } + resources.Listeners = []*v3listenerpb.Listener{lis} + resources.Routes = nil + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // We expect no call to OnError or OnUpdate on our watcher. We just wait for + // a short duration to ensure that. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case err := <-errorCh: + t.Fatalf("Unexpected call to OnError %v", err) + case update := <-updateCh: + t.Fatalf("Unexpected call to OnUpdate %v", pretty.ToJSON(update)) + case <-sCtx.Done(): + } + + // Send valide resources again + listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for initial update from dependency manager") + case update := <-updateCh: + if diff := cmp.Diff(wantXdsConfig, update, cmpOpts...); diff != "" { + t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) + } + } +} + +// Tests the case where we get a route resource ambient error and verify that we +// correctly log the warning for it. To make sure we get a route resource +// ambient error, we send a correct update first , then send an invalid one and +// then send the valid resource again. We send the valid resource again so that +// we can be sure the abmient error reaches the dependency manager since there +// is no other way to wait for it . +func (s) TestRouteResourceAmbientError(t *testing.T) { + grpctest.ExpectWarning("Route resource ambient error") + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + defer mgmtServer.Stop() + + updateCh := make(chan xdsresource.XDSConfig, 1) + errorCh := make(chan error, 1) + watcher := &testWatcher{ + onUpdate: func(cfg xdsresource.XDSConfig) { + updateCh <- cfg + }, + onError: func(err error) { + errorCh <- err + }, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Configure a valid listener and route. + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) + defer dm.Close() + + cmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), + cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), + cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), + } + // Wait for the initial valid update. + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for initial update from dependency manager") + case update := <-updateCh: + if diff := cmp.Diff(wantXdsConfig, update, cmpOpts...); diff != "" { + t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) + } + } + + // Make the route resource invalid + resources.Routes[0].VirtualHosts[0].Routes[0].Match = nil + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // We expect no call to OnError or OnUpdate on our watcher. We just wait for + // a short duration to ensure that. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case err := <-errorCh: + t.Fatalf("Unexpected call to OnError %v", err) + case update := <-updateCh: + t.Fatalf("Unexpected call to OnUpdate %v", pretty.ToJSON(update)) + case <-sCtx.Done(): + } + + // Send valide resources again + listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for initial update from dependency manager") + case update := <-updateCh: + if diff := cmp.Diff(wantXdsConfig, update, cmpOpts...); diff != "" { + t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) + } + } +} + +// Tests the case where the cluster name changes in the route resource update +// and verify that each time OnUpdate is called with correct cluster name. +func (s) TestRouteResourceUpdate(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + defer mgmtServer.Stop() + + updateCh := make(chan xdsresource.XDSConfig, 1) + watcher := &testWatcher{ + onUpdate: func(cfg xdsresource.XDSConfig) { + updateCh <- cfg + }, + onError: func(err error) { + t.Errorf("Received unexpected error from dependency manager: %v", err) + }, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Initial resources with defaultTestClusterName + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) + defer dm.Close() + + // Wait for the first update. + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for initial update from dependency manager") + case update := <-updateCh: + if gotCluster := update.VirtualHost.Routes[0].WeightedClusters[defaultTestClusterName]; gotCluster.Weight != 100 { + t.Fatalf("Update has wrong cluster, got: %v, want: %v", update.VirtualHost.Routes[0].WeightedClusters, defaultTestClusterName) + } + } + + // Update route to point to a new cluster. + const newClusterName = "new-cluster-name" + route2 := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName) + resources.Routes = []*v3routepb.RouteConfiguration{route2} + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Wait for the second update and verify it has the new cluster. + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for route update from dependency manager") + case update := <-updateCh: + if gotCluster := update.VirtualHost.Routes[0].WeightedClusters[newClusterName]; gotCluster.Weight != 100 { + t.Fatalf("Second update has wrong cluster, got: %v, want: %v", update.VirtualHost.Routes[0].WeightedClusters, newClusterName) + } + } +} From 66270eeeca14ae23e002e5bbac7710e5451c4aa9 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 14 Oct 2025 22:12:12 +0530 Subject: [PATCH 02/11] remove const --- .../xds/xdsdependencymanager/xds_dependency_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go index 04e22466a25d..b72918d35034 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go @@ -750,7 +750,7 @@ func (s) TestRouteResourceUpdate(t *testing.T) { } // Update route to point to a new cluster. - const newClusterName = "new-cluster-name" + newClusterName := "new-cluster-name" route2 := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName) resources.Routes = []*v3routepb.RouteConfiguration{route2} if err := mgmtServer.Update(ctx, resources); err != nil { From d80ee7a02f3c8267c41184e2a7112b2e046bc8f0 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Wed, 15 Oct 2025 11:43:45 +0530 Subject: [PATCH 03/11] Change log --- .../xds/xdsdependencymanager/xds_dependency_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go index b72918d35034..2ce1a0e2a642 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go @@ -696,7 +696,7 @@ func (s) TestRouteResourceAmbientError(t *testing.T) { } select { case <-ctx.Done(): - t.Fatal("Timeout waiting for initial update from dependency manager") + t.Fatal("Timeout waiting for update from dependency manager") case update := <-updateCh: if diff := cmp.Diff(wantXdsConfig, update, cmpOpts...); diff != "" { t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) From 3502dbff923133170b747e16f3303d25d25a04b5 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Wed, 15 Oct 2025 11:53:16 +0530 Subject: [PATCH 04/11] resolve conflicts --- internal/xds/xdsdependencymanager/watch_service.go | 4 ++-- .../xds/xdsdependencymanager/xds_dependency_manager.go | 6 +++--- .../xds_dependency_manager_test.go | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/xds/xdsdependencymanager/watch_service.go b/internal/xds/xdsdependencymanager/watch_service.go index 3b3c65ba29db..fb28c517c2be 100644 --- a/internal/xds/xdsdependencymanager/watch_service.go +++ b/internal/xds/xdsdependencymanager/watch_service.go @@ -36,8 +36,8 @@ func newListenerWatcher(resourceName string, parent *DependencyManager) *listene return lw } -func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) { - handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() } +func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) { + handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update); onDone() } l.parent.serializer.ScheduleOr(handleUpdate, onDone) } diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager.go b/internal/xds/xdsdependencymanager/xds_dependency_manager.go index 3234c3e9959f..967c45194f7e 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager.go @@ -55,7 +55,7 @@ type DependencyManager struct { ldsResourceName string listenerWatcher *listenerWatcher - currentListenerUpdate xdsresource.ListenerUpdate + currentListenerUpdate *xdsresource.ListenerUpdate rdsResourceName string currentRouteConfig xdsresource.RouteConfigUpdate @@ -129,7 +129,7 @@ func (m *DependencyManager) maybeSendUpdate() { m.logger.Infof("Sending update to watcher: Listener: %v, RouteConfig: %v", pretty.ToJSON(m.currentListenerUpdate), pretty.ToJSON(m.currentRouteConfig)) } m.watcher.OnUpdate(xdsresource.XDSConfig{ - Listener: m.currentListenerUpdate, + Listener: *m.currentListenerUpdate, RouteConfig: m.currentRouteConfig, VirtualHost: m.currentVirtualHost, }) @@ -147,7 +147,7 @@ func (m *DependencyManager) applyRouteConfigUpdate(update xdsresource.RouteConfi } // Only executed in the context of a serializer callback. -func (m *DependencyManager) onListenerResourceUpdate(update xdsresource.ListenerUpdate) { +func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.ListenerUpdate) { if m.logger.V(2) { m.logger.Infof("Received update for Listener resource %q: %v", m.ldsResourceName, pretty.ToJSON(update)) } diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go index 2ce1a0e2a642..41b0e3f20ef3 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go @@ -73,7 +73,7 @@ var wantXdsConfig = xdsresource.XDSConfig{ { Domains: []string{defaultTestServiceName}, Routes: []*xdsresource.Route{{Prefix: newStringP("/"), - WeightedClusters: map[string]xdsresource.WeightedCluster{defaultTestClusterName: {Weight: 100}}, + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, ActionType: xdsresource.RouteActionRoute}}, }, }, @@ -81,7 +81,7 @@ var wantXdsConfig = xdsresource.XDSConfig{ VirtualHost: &xdsresource.VirtualHost{ Domains: []string{defaultTestServiceName}, Routes: []*xdsresource.Route{{Prefix: newStringP("/"), - WeightedClusters: map[string]xdsresource.WeightedCluster{defaultTestClusterName: {Weight: 100}}, + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, ActionType: xdsresource.RouteActionRoute}}, }, } @@ -273,7 +273,7 @@ func (s) TestInlineRouteConfig(t *testing.T) { { Domains: []string{defaultTestServiceName}, Routes: []*xdsresource.Route{{Prefix: newStringP("/"), - WeightedClusters: map[string]xdsresource.WeightedCluster{defaultTestClusterName: {Weight: 100}}, + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, ActionType: xdsresource.RouteActionRoute}}, }, }, @@ -744,7 +744,7 @@ func (s) TestRouteResourceUpdate(t *testing.T) { case <-ctx.Done(): t.Fatal("Timeout waiting for initial update from dependency manager") case update := <-updateCh: - if gotCluster := update.VirtualHost.Routes[0].WeightedClusters[defaultTestClusterName]; gotCluster.Weight != 100 { + if gotCluster := update.VirtualHost.Routes[0].WeightedClusters[0]; gotCluster.Name != defaultTestClusterName || gotCluster.Weight != 100 { t.Fatalf("Update has wrong cluster, got: %v, want: %v", update.VirtualHost.Routes[0].WeightedClusters, defaultTestClusterName) } } @@ -762,7 +762,7 @@ func (s) TestRouteResourceUpdate(t *testing.T) { case <-ctx.Done(): t.Fatal("Timeout waiting for route update from dependency manager") case update := <-updateCh: - if gotCluster := update.VirtualHost.Routes[0].WeightedClusters[newClusterName]; gotCluster.Weight != 100 { + if gotCluster := update.VirtualHost.Routes[0].WeightedClusters[0]; gotCluster.Name != newClusterName || gotCluster.Weight != 100 { t.Fatalf("Second update has wrong cluster, got: %v, want: %v", update.VirtualHost.Routes[0].WeightedClusters, newClusterName) } } From b696279a94369df4e34e2159ff092d75537bf971 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 16 Oct 2025 15:53:42 +0530 Subject: [PATCH 05/11] remove test --- .../xds_dependency_manager_test.go | 97 ------------------- 1 file changed, 97 deletions(-) diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go index 41b0e3f20ef3..824e82d5c883 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go @@ -607,103 +607,6 @@ func (s) TestListenerResourceAmbientError(t *testing.T) { } } -// Tests the case where we get a route resource ambient error and verify that we -// correctly log the warning for it. To make sure we get a route resource -// ambient error, we send a correct update first , then send an invalid one and -// then send the valid resource again. We send the valid resource again so that -// we can be sure the abmient error reaches the dependency manager since there -// is no other way to wait for it . -func (s) TestRouteResourceAmbientError(t *testing.T) { - grpctest.ExpectWarning("Route resource ambient error") - nodeID := uuid.New().String() - mgmtServer, bc := setupManagementServerForTest(t, nodeID) - defer mgmtServer.Stop() - - updateCh := make(chan xdsresource.XDSConfig, 1) - errorCh := make(chan error, 1) - watcher := &testWatcher{ - onUpdate: func(cfg xdsresource.XDSConfig) { - updateCh <- cfg - }, - onError: func(err error) { - errorCh <- err - }, - } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // Configure a valid listener and route. - listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) - route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{route}, - SkipValidation: true, - } - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) - defer dm.Close() - - cmpOpts := []cmp.Option{ - cmpopts.EquateEmpty(), - cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), - cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), - cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), - } - // Wait for the initial valid update. - select { - case <-ctx.Done(): - t.Fatal("Timeout waiting for initial update from dependency manager") - case update := <-updateCh: - if diff := cmp.Diff(wantXdsConfig, update, cmpOpts...); diff != "" { - t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) - } - } - - // Make the route resource invalid - resources.Routes[0].VirtualHosts[0].Routes[0].Match = nil - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - // We expect no call to OnError or OnUpdate on our watcher. We just wait for - // a short duration to ensure that. - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - select { - case err := <-errorCh: - t.Fatalf("Unexpected call to OnError %v", err) - case update := <-updateCh: - t.Fatalf("Unexpected call to OnUpdate %v", pretty.ToJSON(update)) - case <-sCtx.Done(): - } - - // Send valide resources again - listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) - route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{route}, - SkipValidation: true, - } - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - select { - case <-ctx.Done(): - t.Fatal("Timeout waiting for update from dependency manager") - case update := <-updateCh: - if diff := cmp.Diff(wantXdsConfig, update, cmpOpts...); diff != "" { - t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) - } - } -} - // Tests the case where the cluster name changes in the route resource update // and verify that each time OnUpdate is called with correct cluster name. func (s) TestRouteResourceUpdate(t *testing.T) { From ed609641705bb6dfbab81842664349ef6d0254b1 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Fri, 17 Oct 2025 10:48:59 +0530 Subject: [PATCH 06/11] change test --- .../xds/xdsdependencymanager/xds_dependency_manager_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go index 824e82d5c883..bbc8bc230885 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go @@ -494,13 +494,13 @@ func (s) TestNoVirtualHost(t *testing.T) { } } -// Tests the case where we get a listener resource ambient error and verify that -// we correctly log the warning for it. To make sure we get a listener ambient +// Tests the case where we get an ambient error and verify that +// we correctly log the warning for it. To make sure we get an ambient // error, we send a correct update first , then send an invalid one and then // send the valid resource again. We send the valid resource again so that we // can be sure the abmient error reaches the dependency manager since there is // no other way to wait for it . -func (s) TestListenerResourceAmbientError(t *testing.T) { +func (s) TestAmbientError(t *testing.T) { grpctest.ExpectWarning("Listener resource ambient error") nodeID := uuid.New().String() mgmtServer, bc := setupManagementServerForTest(t, nodeID) From fff601305e156a55752b14e126c93ba86f5fbeee Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 23 Oct 2025 20:00:08 +0530 Subject: [PATCH 07/11] review comments --- .../xds/xdsclient/xdsresource/xdsconfig.go | 42 +++--- .../xds/xdsdependencymanager/watch_service.go | 33 ++++- .../xds_dependency_manager.go | 125 +++++++++--------- .../xds_dependency_manager_test.go | 99 ++++++++------ 4 files changed, 165 insertions(+), 134 deletions(-) diff --git a/internal/xds/xdsclient/xdsresource/xdsconfig.go b/internal/xds/xdsclient/xdsresource/xdsconfig.go index 8b8614edceb7..3a9e98444ad7 100644 --- a/internal/xds/xdsclient/xdsresource/xdsconfig.go +++ b/internal/xds/xdsclient/xdsresource/xdsconfig.go @@ -19,27 +19,21 @@ package xdsresource import "google.golang.org/grpc/resolver" -// XDSConfig holds the complete and resolved xDS resource configuration -// including LDS, RDS, CDS and endpoints. +// XDSConfig holds the complete gRPC client-side xDS configuration containing +// all necessary resources. type XDSConfig struct { - // Listener is the listener resource update - Listener ListenerUpdate + // Listener holds the listener configuration. + Listener *ListenerUpdate - // RouteConfig is the route configuration resource update. It will be - // populated even if RouteConfig is inlined into the Listener resource. + // RouteConfig is the route configuration. It will be populated even if + // RouteConfig is inlined into the Listener resource. RouteConfig RouteConfigUpdate - // VirtualHost is the virtual host from the route configuration matched with - // dataplane authority . + // VirtualHost selected from the route configuration whose domain field + // offers the best match against the provided dataplane authority. VirtualHost *VirtualHost - // Clusters maps the cluster name with the ClusterResult which will have - // either the cluster configuration or error. It will have an error status - // if either - // - // (a) there was an error and we did not already have a valid resource or - // - // (b) the resource does not exist. + // Clusters is a map from cluster name to its configuration. Clusters map[string]*ClusterResult } @@ -49,10 +43,10 @@ type ClusterResult struct { Err error } -// ClusterConfig contains cluster configuration for a single cluster. +// ClusterConfig contains configuration for a single cluster. type ClusterConfig struct { Cluster ClusterUpdate // Cluster configuration. Always present. - EndpointConfig EndpointConfig // Endpoint configuration for leaf clusters which will of type EDS or DNS. + EndpointConfig EndpointConfig // Endpoint configuration for leaf clusters. AggregateConfig AggregateConfig // List of children for aggregate clusters. } @@ -61,12 +55,13 @@ type AggregateConfig struct { LeafClusters []string } -// EndpointConfig contains resolved endpoints for a leaf cluster either from DNS -// or EDS watchers and error. +// EndpointConfig contains configuration corresponding to the endpoints in a +// cluster. Only one of EDSUpdate or DNSEndpoints will be populated based on the +// cluster type. type EndpointConfig struct { - EDSUpdate EndpointsUpdate // Resolved endpoints for EDS clusters. - DNSEndpoints DNSUpdate // Resolved endpoints for LOGICAL_DNS clusters. - ResolutionNote error // Error obtaining endpoints data + EDSUpdate EndpointsUpdate // Configuration for EDS clusters. + DNSEndpoints DNSUpdate // Configuration for LOGICAL_DNS clusters. + ResolutionNote error // Error obtaining endpoints data. } // DNSUpdate contains the update from DNS resolver. @@ -85,7 +80,8 @@ func SetXDSConfig(state resolver.State, config *XDSConfig) resolver.State { return state } -// XDSConfigFromResolverState returns XDSConfig stored in attribute in resolver state. +// XDSConfigFromResolverState returns XDßSConfig stored in attribute in resolver +// state. func XDSConfigFromResolverState(state resolver.State) *XDSConfig { state.Attributes.Value(xdsConfigkey{}) if v := state.Attributes.Value(xdsConfigkey{}); v != nil { diff --git a/internal/xds/xdsdependencymanager/watch_service.go b/internal/xds/xdsdependencymanager/watch_service.go index fb28c517c2be..dfe51d9c40b3 100644 --- a/internal/xds/xdsdependencymanager/watch_service.go +++ b/internal/xds/xdsdependencymanager/watch_service.go @@ -27,48 +27,68 @@ import ( type listenerWatcher struct { resourceName string cancel func() + isCancelled bool parent *DependencyManager } func newListenerWatcher(resourceName string, parent *DependencyManager) *listenerWatcher { lw := &listenerWatcher{resourceName: resourceName, parent: parent} lw.cancel = xdsresource.WatchListener(parent.xdsClient, resourceName, lw) + lw.isCancelled = false return lw } func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) { + if l.isCancelled { + return + } handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update); onDone() } l.parent.serializer.ScheduleOr(handleUpdate, onDone) } func (l *listenerWatcher) ResourceError(err error, onDone func()) { + if l.isCancelled { + return + } handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() } l.parent.serializer.ScheduleOr(handleError, onDone) } func (l *listenerWatcher) AmbientError(err error, onDone func()) { + if l.isCancelled { + return + } handleError := func(context.Context) { l.parent.onListenerResourceAmbientError(err); onDone() } l.parent.serializer.ScheduleOr(handleError, onDone) } func (l *listenerWatcher) stop() { + l.isCancelled = true l.cancel() - l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName) + if logger.V(2) { + l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName) + } } type routeConfigWatcher struct { resourceName string cancel func() parent *DependencyManager + isCancelled bool } func newRouteConfigWatcher(resourceName string, parent *DependencyManager) *routeConfigWatcher { rw := &routeConfigWatcher{resourceName: resourceName, parent: parent} rw.cancel = xdsresource.WatchRouteConfig(parent.xdsClient, resourceName, rw) + rw.isCancelled = false return rw } func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigResourceData, onDone func()) { + if r.isCancelled { + // Drop updates from canceled watchers. + return + } handleUpdate := func(context.Context) { r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource) onDone() @@ -77,16 +97,25 @@ func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigResourceD } func (r *routeConfigWatcher) ResourceError(err error, onDone func()) { + if r.isCancelled { + return + } handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() } r.parent.serializer.ScheduleOr(handleError, onDone) } func (r *routeConfigWatcher) AmbientError(err error, onDone func()) { + if r.isCancelled { + return + } handleError := func(context.Context) { r.parent.onRouteConfigResourceAmbientError(r.resourceName, err); onDone() } r.parent.serializer.ScheduleOr(handleError, onDone) } func (r *routeConfigWatcher) stop() { + r.isCancelled = true r.cancel() - r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName) + if logger.V(2) { + r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName) + } } diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager.go b/internal/xds/xdsdependencymanager/xds_dependency_manager.go index 967c45194f7e..55d5c66680a9 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager.go @@ -14,36 +14,48 @@ * limitations under the License. */ -// Package xdsdepmgr contains functions, structs, and utilities for working with -// xds dependency manager. It will handle all the xds resources like listener, -// route, cluster and endpoint resources and their dependencies. +// Package xdsdepmgr provides the implementation of the xDS dependency manager +// that manages all the xDS watches and resources as described in gRFC A74. package xdsdepmgr import ( "context" + "fmt" - "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" - "google.golang.org/grpc/internal/pretty" - "google.golang.org/grpc/internal/status" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" ) -// DependencyManager struct stores all the information needed for watching all -// the xds resources. +// ConfigWatcher is the interface for consumers of aggregated xDS configuration +// from the DependencyManager. The only consumer of this configuration is +// currently the xDS resolver. +type ConfigWatcher interface { + // OnUpdate is invoked by the dependency manager when a new, validated xDS + // configuration is available. + // + // Implementations must treat the received config as read-only and should + // not modify it. + OnUpdate(*xdsresource.XDSConfig) + + // OnError is invoked when an error is received in listener or route + // resource. This includes cases where: + // - The listener or route resource watcher reports a resource error. + // - The received listener resource is a socket listener, not an API listener - TODO : This is not yet implemented, tracked here #8114 + // - The received route configuration does not contain a virtual host + // matching the channel's default authority. + OnError(error) +} + +// DependencyManager registers watches on the xDS client for all required xDS +// resources, resolves dependencies between them, and returns a complete +// configuration to the xDS resolver. type DependencyManager struct { xdsClient xdsclient.XDSClient watcher ConfigWatcher - // All methods on the xdsResolver type except for the ones invoked by gRPC, - // i.e ResolveNow() and Close(), are guaranteed to execute in the context of - // this serializer's callback. And since the serializer guarantees mutual - // exclusion among these callbacks, we can get by without any mutexes to - // access all of the below defined state. The only exception is Close(), - // which does access some of this shared state, but it does so after - // cancelling the context passed to the serializer. + serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc @@ -65,14 +77,15 @@ type DependencyManager struct { logger *grpclog.PrefixLogger } -// New creates a new dependency manager that manages all the xds resources -// including LDS, RDS, CDS and EDS and sends update once we have all the -// resources and sends an error when we get error in listener or route -// resources. -func New(listenername, dataplaneAuthority string, xdsClient xdsclient.XDSClient, watcher ConfigWatcher) *DependencyManager { - // Builds the dependency manager and starts the listener watch. +// New creates a new DependencyManager. +// +// listenerName is the name of the Listener resource to request from the management server. +// dataplaneAuthority is used to select the best matching virtual host from the route configuration received from the management server. +// +// Configuration updates and/or errors are delivered to the watcher. +func New(listenerName, dataplaneAuthority string, xdsClient xdsclient.XDSClient, watcher ConfigWatcher) *DependencyManager { dm := &DependencyManager{ - ldsResourceName: listenername, + ldsResourceName: listenerName, dataplaneAuthority: dataplaneAuthority, xdsClient: xdsClient, watcher: watcher, @@ -87,7 +100,7 @@ func New(listenername, dataplaneAuthority string, xdsClient xdsclient.XDSClient, // Start the listener watch. Listener watch will start the other resource // watches as needed. - dm.listenerWatcher = newListenerWatcher(listenername, dm) + dm.listenerWatcher = newListenerWatcher(listenerName, dm) return dm } @@ -107,49 +120,39 @@ func (m *DependencyManager) Close() { } } -// ConfigWatcher is notified of the XDSConfig resource updates and errors that -// are received by the xDS client from the management server. It only receives a -// XDSConfig update after all the xds resources have been received. -type ConfigWatcher interface { - // OnUpdate is invoked by the dependency manager to provide a new, - // validated xDS configuration to the watcher. - OnUpdate(xdsresource.XDSConfig) - - // OnError is invoked when an error is received in listener or route - // resource. This includes cases where: - // - The listener or route resource watcher reports a resource error. - // - The received listener resource is a socket listener, not an API listener - TODO : This is not yet implemented, tracked here #8114 - // - The received route configuration does not contain a virtual host - // matching the channel's default authority. - OnError(error) +// annotateErrorWithNodeID annotates the given error with the provided xDS node +// ID. +func (m *DependencyManager) annotateErrorWithNodeID(err error) error { + nodeID := m.xdsClient.BootstrapConfig().Node().GetId() + return fmt.Errorf("[xDS node id: %v]: %w", nodeID, err) } func (m *DependencyManager) maybeSendUpdate() { - if m.logger.V(2) { - m.logger.Infof("Sending update to watcher: Listener: %v, RouteConfig: %v", pretty.ToJSON(m.currentListenerUpdate), pretty.ToJSON(m.currentRouteConfig)) - } - m.watcher.OnUpdate(xdsresource.XDSConfig{ - Listener: *m.currentListenerUpdate, + m.logger.Infof("eshita listener update : %+v \n route update %+v", m.currentListenerUpdate, m.currentRouteConfig) + u := &xdsresource.XDSConfig{ + Listener: m.currentListenerUpdate, RouteConfig: m.currentRouteConfig, VirtualHost: m.currentVirtualHost, - }) + } + m.watcher.OnUpdate(u) + m.logger.Infof("Eshita Sent aggregated xDS config update to watcher %v", *u) } func (m *DependencyManager) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) { - matchVh := xdsresource.FindBestMatchingVirtualHost(m.dataplaneAuthority, update.VirtualHosts) - if matchVh == nil { - m.watcher.OnError(status.Errorf(codes.Unavailable, "Could not find VirtualHost for %q", m.dataplaneAuthority)) + matchVH := xdsresource.FindBestMatchingVirtualHost(m.dataplaneAuthority, update.VirtualHosts) + if matchVH == nil { + m.watcher.OnError(fmt.Errorf("could not find VirtualHost for %q", m.dataplaneAuthority)) return } m.currentRouteConfig = update - m.currentVirtualHost = matchVh + m.currentVirtualHost = matchVH m.maybeSendUpdate() } // Only executed in the context of a serializer callback. func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.ListenerUpdate) { if m.logger.V(2) { - m.logger.Infof("Received update for Listener resource %q: %v", m.ldsResourceName, pretty.ToJSON(update)) + m.logger.Infof("Received update for Listener resource %q: %+v", m.ldsResourceName, update) } m.currentListenerUpdate = update @@ -192,7 +195,7 @@ func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.Listene // Only executed in the context of a serializer callback. func (m *DependencyManager) onListenerResourceError(err error) { if m.logger.V(2) { - m.logger.Infof("Received resource error for Listener resource %q: %v", m.ldsResourceName, err) + m.logger.Infof("Received resource error for Listener resource %q: %v", m.ldsResourceName, m.annotateErrorWithNodeID(err)) } if m.routeConfigWatcher != nil { @@ -201,23 +204,18 @@ func (m *DependencyManager) onListenerResourceError(err error) { m.rdsResourceName = "" m.currentVirtualHost = nil m.routeConfigWatcher = nil - m.watcher.OnError(status.Errorf(codes.Unavailable, "Listener resource error : %v", err)) + m.watcher.OnError(fmt.Errorf("listener resource error : %v", err)) } // Only executed in the context of a serializer callback. func (m *DependencyManager) onListenerResourceAmbientError(err error) { - m.logger.Warningf("Listener resource ambient error: %v", err) + m.logger.Warningf("Listener resource ambient error: %v", m.annotateErrorWithNodeID(err)) } // Only executed in the context of a serializer callback. func (m *DependencyManager) onRouteConfigResourceUpdate(resourceName string, update xdsresource.RouteConfigUpdate) { if m.logger.V(2) { - m.logger.Infof("Received update for RouteConfiguration resource %q: %v", resourceName, pretty.ToJSON(update)) - } - - if m.rdsResourceName != resourceName { - // Drop updates from canceled watchers. - return + m.logger.Infof("Received update for RouteConfiguration resource %q: %+v", resourceName, update) } m.applyRouteConfigUpdate(update) } @@ -225,17 +223,12 @@ func (m *DependencyManager) onRouteConfigResourceUpdate(resourceName string, upd // Only executed in the context of a serializer callback. func (m *DependencyManager) onRouteConfigResourceError(resourceName string, err error) { if m.logger.V(2) { - m.logger.Infof("Received resource error for RouteConfiguration resource %q: %v", resourceName, err) - } - - //If update is not for the current watcher - if m.rdsResourceName != resourceName { - return + m.logger.Infof("Received resource error for RouteConfiguration resource %q: %v", resourceName, m.annotateErrorWithNodeID(err)) } - m.watcher.OnError(status.Errorf(codes.Unavailable, "Route resource error : %v", err)) + m.watcher.OnError(fmt.Errorf("route resource error : %v", err)) } // Only executed in the context of a serializer callback. func (m *DependencyManager) onRouteConfigResourceAmbientError(resourceName string, err error) { - m.logger.Warningf("Route resource ambient error %q: %v", resourceName, err) + m.logger.Warningf("Route resource ambient error %q: %v", resourceName, m.annotateErrorWithNodeID(err)) } diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go index bbc8bc230885..823761dccfd4 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go @@ -29,15 +29,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" - "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/grpctest" - "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" - "google.golang.org/grpc/status" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" @@ -65,7 +62,7 @@ const ( ) var wantXdsConfig = xdsresource.XDSConfig{ - Listener: xdsresource.ListenerUpdate{ + Listener: &xdsresource.ListenerUpdate{ RouteConfigName: defaultTestRouteConfigName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}}, RouteConfig: xdsresource.RouteConfigUpdate{ @@ -93,12 +90,12 @@ func newStringP(s string) *string { // testWatcher is a mock implementation of the ConfigWatcher interface // that allows defining custom logic for its methods in each test. type testWatcher struct { - onUpdate func(xdsresource.XDSConfig) + onUpdate func(*xdsresource.XDSConfig) onError func(error) } // OnUpdate calls the underlying onUpdate function if it's not nil. -func (w *testWatcher) OnUpdate(cfg xdsresource.XDSConfig) { +func (w *testWatcher) OnUpdate(cfg *xdsresource.XDSConfig) { if w.onUpdate != nil { w.onUpdate(cfg) } @@ -111,16 +108,13 @@ func (w *testWatcher) OnError(err error) { } } -func verifyError(gotErr error, wantCode codes.Code, wantErr, wantNodeID string) error { +func verifyError(gotErr error, wantErr, wantNodeID string) error { if gotErr == nil { - return fmt.Errorf("got nil error from resolver, want error with code %v", wantCode) + return fmt.Errorf("got nil error from resolver, want error") } if !strings.Contains(gotErr.Error(), wantErr) { return fmt.Errorf("got error from resolver %q, want %q", gotErr, wantErr) } - if gotCode := status.Code(gotErr); gotCode != wantCode { - return fmt.Errorf("got error from resolver with code %v, want %v", gotCode, wantCode) - } if !strings.Contains(gotErr.Error(), wantNodeID) { return fmt.Errorf("got error from resolver %q, want nodeID %q", gotErr, wantNodeID) } @@ -178,8 +172,8 @@ func (s) TestHappyCase(t *testing.T) { updateCh := make(chan xdsresource.XDSConfig, 1) watcher := &testWatcher{ - onUpdate: func(cfg xdsresource.XDSConfig) { - updateCh <- cfg + onUpdate: func(cfg *xdsresource.XDSConfig) { + updateCh <- *cfg }, onError: func(err error) { t.Errorf("Received unexpected error from dependency manager: %v", err) @@ -228,8 +222,8 @@ func (s) TestInlineRouteConfig(t *testing.T) { updateCh := make(chan xdsresource.XDSConfig, 1) watcher := &testWatcher{ - onUpdate: func(cfg xdsresource.XDSConfig) { - updateCh <- cfg + onUpdate: func(cfg *xdsresource.XDSConfig) { + updateCh <- *cfg }, onError: func(err error) { t.Errorf("Received unexpected error from dependency manager: %v", err) @@ -266,16 +260,34 @@ func (s) TestInlineRouteConfig(t *testing.T) { dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) defer dm.Close() - wantConfig := wantXdsConfig - wantConfig.Listener.RouteConfigName = "" - wantConfig.Listener.InlineRouteConfig = &xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute}}, + wantConfig := xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + InlineRouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}}, + }, + }, }, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}}, + RouteConfig: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}}, + }, + }, + }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}}, }, } cmpOpts := []cmp.Option{ @@ -304,8 +316,8 @@ func (s) TestIncompleteResources(t *testing.T) { updateCh := make(chan xdsresource.XDSConfig, 1) watcher := &testWatcher{ - onUpdate: func(cfg xdsresource.XDSConfig) { - updateCh <- cfg + onUpdate: func(cfg *xdsresource.XDSConfig) { + updateCh <- *cfg }, onError: func(err error) { t.Errorf("Received unexpected error from dependency manager: %v", err) @@ -330,7 +342,7 @@ func (s) TestIncompleteResources(t *testing.T) { select { case <-sCtx.Done(): case update := <-updateCh: - t.Fatalf("Received unexpected update from dependency manager: %v", pretty.ToJSON(update)) + t.Fatalf("Received unexpected update from dependency manager: %+v", update) } } @@ -345,18 +357,19 @@ func (s) TestListenerResourceError(t *testing.T) { errorCh := make(chan error, 1) updateCh := make(chan xdsresource.XDSConfig, 1) watcher := &testWatcher{ - onUpdate: func(cfg xdsresource.XDSConfig) { - updateCh <- cfg + onUpdate: func(cfg *xdsresource.XDSConfig) { + updateCh <- *cfg }, onError: func(err error) { errorCh <- err }, } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() // Send a correct update first listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + listener.FilterChains = nil route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) resources := e2e.UpdateOptions{ NodeID: nodeID, @@ -381,7 +394,7 @@ func (s) TestListenerResourceError(t *testing.T) { case <-ctx.Done(): t.Fatal("Timeout waiting for update from dependency manager") case update := <-updateCh: - if diff := cmp.Diff(wantXdsConfig, update, cmpOpts...); diff != "" { + if diff := cmp.Diff(update, wantXdsConfig, cmpOpts...); diff != "" { t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) } } @@ -393,7 +406,7 @@ func (s) TestListenerResourceError(t *testing.T) { } select { case err := <-errorCh: - if err := verifyError(err, codes.Unavailable, fmt.Sprintf("xds: resource %q of type %q has been removed", defaultTestServiceName, "ListenerResource"), nodeID); err != nil { + if err := verifyError(err, fmt.Sprintf("xds: resource %q of type %q has been removed", defaultTestServiceName, "ListenerResource"), nodeID); err != nil { t.Fatal(err) } case <-ctx.Done(): @@ -411,7 +424,7 @@ func (s) TestRouteResourceError(t *testing.T) { errorCh := make(chan error, 1) watcher := &testWatcher{ - onUpdate: func(xdsresource.XDSConfig) { + onUpdate: func(*xdsresource.XDSConfig) { t.Errorf("Received unexpected update from dependency manager") }, onError: func(err error) { @@ -441,7 +454,7 @@ func (s) TestRouteResourceError(t *testing.T) { select { case err := <-errorCh: - if err := verifyError(err, codes.Unavailable, "Route resource error", nodeID); err != nil { + if err := verifyError(err, "route resource error", nodeID); err != nil { t.Fatal(err) } case <-ctx.Done(): @@ -458,7 +471,7 @@ func (s) TestNoVirtualHost(t *testing.T) { errorCh := make(chan error, 1) watcher := &testWatcher{ - onUpdate: func(xdsresource.XDSConfig) { + onUpdate: func(*xdsresource.XDSConfig) { t.Errorf("Received unexpected update from dependency manager") }, onError: func(err error) { @@ -486,7 +499,7 @@ func (s) TestNoVirtualHost(t *testing.T) { select { case err := <-errorCh: - if err := verifyError(err, codes.Unavailable, "Could not find VirtualHost", ""); err != nil { + if err := verifyError(err, "could not find VirtualHost", ""); err != nil { t.Fatal(err) } case <-ctx.Done(): @@ -509,8 +522,8 @@ func (s) TestAmbientError(t *testing.T) { updateCh := make(chan xdsresource.XDSConfig, 1) errorCh := make(chan error, 1) watcher := &testWatcher{ - onUpdate: func(cfg xdsresource.XDSConfig) { - updateCh <- cfg + onUpdate: func(cfg *xdsresource.XDSConfig) { + updateCh <- *cfg }, onError: func(err error) { errorCh <- err @@ -546,7 +559,7 @@ func (s) TestAmbientError(t *testing.T) { case <-ctx.Done(): t.Fatal("Timeout waiting for initial update from dependency manager") case update := <-updateCh: - if diff := cmp.Diff(wantXdsConfig, update, cmpOpts...); diff != "" { + if diff := cmp.Diff(update, wantXdsConfig, cmpOpts...); diff != "" { t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) } } @@ -581,7 +594,7 @@ func (s) TestAmbientError(t *testing.T) { case err := <-errorCh: t.Fatalf("Unexpected call to OnError %v", err) case update := <-updateCh: - t.Fatalf("Unexpected call to OnUpdate %v", pretty.ToJSON(update)) + t.Fatalf("Unexpected call to OnUpdate %+v", update) case <-sCtx.Done(): } @@ -601,7 +614,7 @@ func (s) TestAmbientError(t *testing.T) { case <-ctx.Done(): t.Fatal("Timeout waiting for initial update from dependency manager") case update := <-updateCh: - if diff := cmp.Diff(wantXdsConfig, update, cmpOpts...); diff != "" { + if diff := cmp.Diff(update, wantXdsConfig, cmpOpts...); diff != "" { t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) } } @@ -616,8 +629,8 @@ func (s) TestRouteResourceUpdate(t *testing.T) { updateCh := make(chan xdsresource.XDSConfig, 1) watcher := &testWatcher{ - onUpdate: func(cfg xdsresource.XDSConfig) { - updateCh <- cfg + onUpdate: func(cfg *xdsresource.XDSConfig) { + updateCh <- *cfg }, onError: func(err error) { t.Errorf("Received unexpected error from dependency manager: %v", err) From 58478e6f7097bac5082d2f8c55bf245cd17f3e49 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Fri, 24 Oct 2025 12:17:46 +0530 Subject: [PATCH 08/11] cancelled watcher updates --- .../xds/xdsdependencymanager/watch_service.go | 29 ++++++++++++++----- .../xds_dependency_manager.go | 7 ++--- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/internal/xds/xdsdependencymanager/watch_service.go b/internal/xds/xdsdependencymanager/watch_service.go index dfe51d9c40b3..a0dcb463ea91 100644 --- a/internal/xds/xdsdependencymanager/watch_service.go +++ b/internal/xds/xdsdependencymanager/watch_service.go @@ -34,7 +34,6 @@ type listenerWatcher struct { func newListenerWatcher(resourceName string, parent *DependencyManager) *listenerWatcher { lw := &listenerWatcher{resourceName: resourceName, parent: parent} lw.cancel = xdsresource.WatchListener(parent.xdsClient, resourceName, lw) - lw.isCancelled = false return lw } @@ -42,7 +41,10 @@ func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, on if l.isCancelled { return } - handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update); onDone() } + handleUpdate := func(context.Context) { + l.parent.onListenerResourceUpdate(update) + onDone() + } l.parent.serializer.ScheduleOr(handleUpdate, onDone) } @@ -50,7 +52,11 @@ func (l *listenerWatcher) ResourceError(err error, onDone func()) { if l.isCancelled { return } - handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() } + handleError := func(context.Context) { + l.parent.onListenerResourceError(err) + onDone() + } + l.parent.serializer.ScheduleOr(handleError, onDone) } @@ -58,7 +64,10 @@ func (l *listenerWatcher) AmbientError(err error, onDone func()) { if l.isCancelled { return } - handleError := func(context.Context) { l.parent.onListenerResourceAmbientError(err); onDone() } + handleError := func(context.Context) { + l.parent.onListenerResourceAmbientError(err) + onDone() + } l.parent.serializer.ScheduleOr(handleError, onDone) } @@ -80,13 +89,11 @@ type routeConfigWatcher struct { func newRouteConfigWatcher(resourceName string, parent *DependencyManager) *routeConfigWatcher { rw := &routeConfigWatcher{resourceName: resourceName, parent: parent} rw.cancel = xdsresource.WatchRouteConfig(parent.xdsClient, resourceName, rw) - rw.isCancelled = false return rw } func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigResourceData, onDone func()) { if r.isCancelled { - // Drop updates from canceled watchers. return } handleUpdate := func(context.Context) { @@ -100,7 +107,10 @@ func (r *routeConfigWatcher) ResourceError(err error, onDone func()) { if r.isCancelled { return } - handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() } + handleError := func(context.Context) { + r.parent.onRouteConfigResourceError(r.resourceName, err) + onDone() + } r.parent.serializer.ScheduleOr(handleError, onDone) } @@ -108,7 +118,10 @@ func (r *routeConfigWatcher) AmbientError(err error, onDone func()) { if r.isCancelled { return } - handleError := func(context.Context) { r.parent.onRouteConfigResourceAmbientError(r.resourceName, err); onDone() } + handleError := func(context.Context) { + r.parent.onRouteConfigResourceAmbientError(r.resourceName, err) + onDone() + } r.parent.serializer.ScheduleOr(handleError, onDone) } diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager.go b/internal/xds/xdsdependencymanager/xds_dependency_manager.go index 55d5c66680a9..75e64f233a0f 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager.go @@ -128,14 +128,11 @@ func (m *DependencyManager) annotateErrorWithNodeID(err error) error { } func (m *DependencyManager) maybeSendUpdate() { - m.logger.Infof("eshita listener update : %+v \n route update %+v", m.currentListenerUpdate, m.currentRouteConfig) - u := &xdsresource.XDSConfig{ + m.watcher.OnUpdate(&xdsresource.XDSConfig{ Listener: m.currentListenerUpdate, RouteConfig: m.currentRouteConfig, VirtualHost: m.currentVirtualHost, - } - m.watcher.OnUpdate(u) - m.logger.Infof("Eshita Sent aggregated xDS config update to watcher %v", *u) + }) } func (m *DependencyManager) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) { From c279455ce81a8d046424d53cb4a3d685879e7dea Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Fri, 24 Oct 2025 14:54:28 +0530 Subject: [PATCH 09/11] fine changes --- .../xds/xdsclient/xdsresource/xdsconfig.go | 10 ++- .../xds_dependency_manager.go | 2 - .../xds_dependency_manager_test.go | 87 ++++++++----------- 3 files changed, 45 insertions(+), 54 deletions(-) diff --git a/internal/xds/xdsclient/xdsresource/xdsconfig.go b/internal/xds/xdsclient/xdsresource/xdsconfig.go index 3a9e98444ad7..2c553a43f063 100644 --- a/internal/xds/xdsclient/xdsresource/xdsconfig.go +++ b/internal/xds/xdsclient/xdsresource/xdsconfig.go @@ -37,7 +37,11 @@ type XDSConfig struct { Clusters map[string]*ClusterResult } -// ClusterResult contains either a cluster's configuration or an error. +// ClusterResult contains a cluster's configuration when we receive a +// valid resource from the management server. It contains an error when: +// - we receive an invalid resource from the management server and +// we did not already have a valid resource or +// - the cluster resource does not exist on the management server type ClusterResult struct { Config ClusterConfig Err error @@ -50,7 +54,7 @@ type ClusterConfig struct { AggregateConfig AggregateConfig // List of children for aggregate clusters. } -// AggregateConfig contains a list of leaf cluster names. +// AggregateConfig contains a list of leaf cluster names for . type AggregateConfig struct { LeafClusters []string } @@ -80,7 +84,7 @@ func SetXDSConfig(state resolver.State, config *XDSConfig) resolver.State { return state } -// XDSConfigFromResolverState returns XDßSConfig stored in attribute in resolver +// XDSConfigFromResolverState returns XDSConfig stored in attribute in resolver // state. func XDSConfigFromResolverState(state resolver.State) *XDSConfig { state.Attributes.Value(xdsConfigkey{}) diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager.go b/internal/xds/xdsdependencymanager/xds_dependency_manager.go index 75e64f233a0f..8607ab3fcfc7 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager.go @@ -162,13 +162,11 @@ func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.Listene m.routeConfigWatcher.stop() m.routeConfigWatcher = nil } - m.applyRouteConfigUpdate(*update.InlineRouteConfig) return } // We get here only if there was no inline route configuration. - // If the route config name has not changed, send an update with existing // route configuration and the newly received listener configuration. if m.rdsResourceName == update.RouteConfigName { diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go index 823761dccfd4..b55d642daa99 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go @@ -61,34 +61,44 @@ const ( defaultTestClusterName = "cluster-name" ) -var wantXdsConfig = xdsresource.XDSConfig{ - Listener: &xdsresource.ListenerUpdate{ - RouteConfigName: defaultTestRouteConfigName, - HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}}, - RouteConfig: xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute}}, +var ( + cmpOpts = []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), + cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), + cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), + } + + wantXdsConfig = xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + RouteConfigName: defaultTestRouteConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}}, + RouteConfig: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}}, + }, }, }, - }, - VirtualHost: &xdsresource.VirtualHost{ - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute}}, - }, -} + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}}, + }, + } +) func newStringP(s string) *string { return &s } -// testWatcher is a mock implementation of the ConfigWatcher interface -// that allows defining custom logic for its methods in each test. +// testWatcher is a mock implementation of the ConfigWatcher interface that +// allows defining custom logic for its onUpdate and onError methods in each +// test. type testWatcher struct { onUpdate func(*xdsresource.XDSConfig) onError func(error) @@ -164,7 +174,8 @@ func setupManagementServerForTest(t *testing.T, nodeID string) (*e2e.ManagementS } // Tests the happy case where the dependency manager receives all the required -// resources and calls OnUpdate with the correct XDSConfig. +// resources and verifies that OnUpdate is called with with the correct +// XDSConfig. func (s) TestHappyCase(t *testing.T) { nodeID := uuid.New().String() mgmtServer, bc := setupManagementServerForTest(t, nodeID) @@ -179,8 +190,10 @@ func (s) TestHappyCase(t *testing.T) { t.Errorf("Received unexpected error from dependency manager: %v", err) }, } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) resources := e2e.UpdateOptions{ @@ -195,19 +208,12 @@ func (s) TestHappyCase(t *testing.T) { dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) defer dm.Close() - cmpOpts := []cmp.Option{ - cmpopts.EquateEmpty(), - cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), - cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), - cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), - } - select { case <-ctx.Done(): t.Fatal("Timeout waiting for update from dependency manager") case update := <-updateCh: if diff := cmp.Diff(update, wantXdsConfig, cmpOpts...); diff != "" { - t.Fatalf("Did not receive expected update from dependency manager,. Diff (-got +want):\n%v", diff) + t.Fatalf("Did not receive expected update from dependency manager. Diff (-got +want):\n%v", diff) } } @@ -290,12 +296,7 @@ func (s) TestInlineRouteConfig(t *testing.T) { ActionType: xdsresource.RouteActionRoute}}, }, } - cmpOpts := []cmp.Option{ - cmpopts.EquateEmpty(), - cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), - cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), - cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), - } + select { case <-ctx.Done(): t.Fatal("Timeout waiting for update from dependency manager") @@ -384,12 +385,6 @@ func (s) TestListenerResourceError(t *testing.T) { dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) defer dm.Close() - cmpOpts := []cmp.Option{ - cmpopts.EquateEmpty(), - cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), - cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), - cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), - } select { case <-ctx.Done(): t.Fatal("Timeout waiting for update from dependency manager") @@ -548,12 +543,6 @@ func (s) TestAmbientError(t *testing.T) { dm := newDependencyManagerForTest(t, defaultTestServiceName, defaultTestServiceName, bc, watcher) defer dm.Close() - cmpOpts := []cmp.Option{ - cmpopts.EquateEmpty(), - cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), - cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), - cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), - } // Wait for the initial valid update. select { case <-ctx.Done(): @@ -598,7 +587,7 @@ func (s) TestAmbientError(t *testing.T) { case <-sCtx.Done(): } - // Send valide resources again + // Send valid resources again. listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) resources = e2e.UpdateOptions{ From 1457ea2b0e3126d5a34287586ba74d0938b1321e Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Wed, 29 Oct 2025 14:08:03 +0530 Subject: [PATCH 10/11] review comments --- internal/grpctest/tlogger.go | 57 ++++++++----------- .../xds/xdsclient/xdsresource/xdsconfig.go | 40 +++++++++---- .../xds/xdsdependencymanager/watch_service.go | 12 ++++ .../xds_dependency_manager.go | 11 ++-- .../xds_dependency_manager_test.go | 4 +- 5 files changed, 72 insertions(+), 52 deletions(-) diff --git a/internal/grpctest/tlogger.go b/internal/grpctest/tlogger.go index 0d083b048ab6..756611b9ff78 100644 --- a/internal/grpctest/tlogger.go +++ b/internal/grpctest/tlogger.go @@ -66,11 +66,10 @@ type tLogger struct { v int initialized bool - mu sync.Mutex // guards t, start, and errors - t *testing.T - start time.Time - errors map[*regexp.Regexp]int - warnings map[*regexp.Regexp]int + mu sync.Mutex + t *testing.T + start time.Time + logs []map[*regexp.Regexp]int } func init() { @@ -88,7 +87,7 @@ func init() { } } // Initialize tLogr with the determined verbosity level. - tLogr = &tLogger{errors: make(map[*regexp.Regexp]int), warnings: make(map[*regexp.Regexp]int), v: vLevel} + tLogr = &tLogger{logs: []map[*regexp.Regexp]int{{}, {}, {}, {}}, v: vLevel} } // getCallingPrefix returns the at the given depth from the stack. @@ -116,13 +115,13 @@ func (tl *tLogger) log(ltype logType, depth int, format string, args ...any) { switch ltype { case errorLog: // fmt.Sprintln is used rather than fmt.Sprint because tl.Log uses fmt.Sprintln behavior. - if tl.expected(fmt.Sprintln(args...)) { + if tl.expected(fmt.Sprintln(args...), int(errorLog)) { tl.t.Log(args...) } else { tl.t.Error(args...) } case warningLog: - tl.expectedWarning(fmt.Sprintln(args...)) + tl.expected(fmt.Sprintln(args...), int(warningLog)) tl.t.Log(args...) case fatalLog: panic(fmt.Sprint(args...)) @@ -134,11 +133,14 @@ func (tl *tLogger) log(ltype logType, depth int, format string, args ...any) { format = "%v " + format + "%s" switch ltype { case errorLog: - if tl.expected(fmt.Sprintf(format, args...)) { + if tl.expected(fmt.Sprintf(format, args...), int(errorLog)) { tl.t.Logf(format, args...) } else { tl.t.Errorf(format, args...) } + case warningLog: + tl.expected(fmt.Sprintln(args...), int(warningLog)) + tl.t.Log(args...) case fatalLog: panic(fmt.Sprintf(format, args...)) default: @@ -158,8 +160,7 @@ func (tl *tLogger) update(t *testing.T) { } tl.t = t tl.start = time.Now() - tl.errors = map[*regexp.Regexp]int{} - tl.warnings = map[*regexp.Regexp]int{} + tl.logs = []map[*regexp.Regexp]int{{}, {}, {}, {}} } // ExpectError declares an error to be expected. For the next test, the first @@ -180,7 +181,7 @@ func ExpectErrorN(expr string, n int) { tLogr.t.Error(err) return } - tLogr.errors[re] += n + tLogr.logs[errorLog][re] += n } // ExpectWarning declares a warning to be expected. @@ -192,34 +193,34 @@ func ExpectWarning(expr string) { tLogr.t.Error(err) return } - tLogr.warnings[re]++ + tLogr.logs[warningLog][re]++ } // endTest checks if expected errors were not encountered. func (tl *tLogger) endTest(t *testing.T) { tl.mu.Lock() defer tl.mu.Unlock() - for re, count := range tl.errors { + for re, count := range tl.logs[errorLog] { if count > 0 { t.Errorf("Expected error '%v' not encountered", re.String()) } } - tl.errors = map[*regexp.Regexp]int{} - for re, count := range tl.warnings { + for re, count := range tl.logs[warningLog] { if count > 0 { t.Errorf("Expected warning '%v' not encountered", re.String()) } } - tl.warnings = map[*regexp.Regexp]int{} + tl.logs = []map[*regexp.Regexp]int{{}, {}, {}, {}} } -// expected determines if the error string is protected or not. -func (tl *tLogger) expected(s string) bool { - for re, count := range tl.errors { +// expected determines if the log string of the particular type is protected or +// not. +func (tl *tLogger) expected(s string, logType int) bool { + for re, count := range tl.logs[logType] { if re.FindStringIndex(s) != nil { - tl.errors[re]-- + tl.logs[logType][re]-- if count <= 1 { - delete(tl.errors, re) + delete(tl.logs[logType], re) } return true } @@ -227,18 +228,6 @@ func (tl *tLogger) expected(s string) bool { return false } -// expectedWarning determines if the warning string is protected or not. -func (tl *tLogger) expectedWarning(s string) { - for re, count := range tl.warnings { - if re.FindStringIndex(s) != nil { - tl.warnings[re]-- - if count <= 1 { - delete(tl.warnings, re) - } - } - } -} - func (tl *tLogger) Info(args ...any) { tl.log(infoLog, 0, "", args...) } diff --git a/internal/xds/xdsclient/xdsresource/xdsconfig.go b/internal/xds/xdsclient/xdsresource/xdsconfig.go index 2c553a43f063..cf712addd3d6 100644 --- a/internal/xds/xdsclient/xdsresource/xdsconfig.go +++ b/internal/xds/xdsclient/xdsresource/xdsconfig.go @@ -22,15 +22,18 @@ import "google.golang.org/grpc/resolver" // XDSConfig holds the complete gRPC client-side xDS configuration containing // all necessary resources. type XDSConfig struct { - // Listener holds the listener configuration. + // Listener holds the listener configuration. It is garunteed to be + // non-nil. Listener *ListenerUpdate // RouteConfig is the route configuration. It will be populated even if - // RouteConfig is inlined into the Listener resource. - RouteConfig RouteConfigUpdate + // RouteConfig is inlined into the Listener resource. It is garunteed to be + // non-nil. + RouteConfig *RouteConfigUpdate // VirtualHost selected from the route configuration whose domain field - // offers the best match against the provided dataplane authority. + // offers the best match against the provided dataplane authority. It is + // garunteed to be non-nil. VirtualHost *VirtualHost // Clusters is a map from cluster name to its configuration. @@ -49,13 +52,19 @@ type ClusterResult struct { // ClusterConfig contains configuration for a single cluster. type ClusterConfig struct { - Cluster ClusterUpdate // Cluster configuration. Always present. - EndpointConfig EndpointConfig // Endpoint configuration for leaf clusters. - AggregateConfig AggregateConfig // List of children for aggregate clusters. + // Cluster configuration for the cluster. This field is always set to a non-zero value + Cluster *ClusterUpdate + // Endpoint configuration for the cluster. This field is only set if the + // cluster is a leaf cluster. + EndpointConfig *EndpointConfig + // AggregateConfig is the set of leaf clusters for the cluster. This field + // is only set if the cluster is of type AGGREGATE. + AggregateConfig *AggregateConfig } -// AggregateConfig contains a list of leaf cluster names for . +// AggregateConfig holds the configuration for an aggregate cluster. type AggregateConfig struct { + // LeafClusters specifies the names of the leaf clusters for the cluster. LeafClusters []string } @@ -63,13 +72,20 @@ type AggregateConfig struct { // cluster. Only one of EDSUpdate or DNSEndpoints will be populated based on the // cluster type. type EndpointConfig struct { - EDSUpdate EndpointsUpdate // Configuration for EDS clusters. - DNSEndpoints DNSUpdate // Configuration for LOGICAL_DNS clusters. - ResolutionNote error // Error obtaining endpoints data. + // Endpoint configurartion for the EDS type cluster. + EDSUpdate *EndpointsUpdate + // Endpoint configuration for the LOGICAL_DNS type cluster. + DNSEndpoints DNSUpdate + // Stores error encountered while obtaining endpoints data for the cluster. + ResolutionNote error } -// DNSUpdate contains the update from DNS resolver. +// DNSUpdate represents the result of a DNS resolution, containing a +// list of discovered endpoints. This is only populated for the +// LOGICAL_DNS cluster type. type DNSUpdate struct { + // Endpoints is the complete list of endpoints returned by the + // DNS resolver. Endpoints []resolver.Endpoint } diff --git a/internal/xds/xdsdependencymanager/watch_service.go b/internal/xds/xdsdependencymanager/watch_service.go index a0dcb463ea91..130ccd7e3b1b 100644 --- a/internal/xds/xdsdependencymanager/watch_service.go +++ b/internal/xds/xdsdependencymanager/watch_service.go @@ -71,6 +71,11 @@ func (l *listenerWatcher) AmbientError(err error, onDone func()) { l.parent.serializer.ScheduleOr(handleError, onDone) } +// Stops the listenerWatcher. +// +// This method is not safe to be called concurrently. This method is guaranteed not to be called concurrently with +// resource callbacks. The dependency manager's Close() ensures all callbacks +// are drained before calling stop. func (l *listenerWatcher) stop() { l.isCancelled = true l.cancel() @@ -125,6 +130,13 @@ func (r *routeConfigWatcher) AmbientError(err error, onDone func()) { r.parent.serializer.ScheduleOr(handleError, onDone) } +// Stops the routeWatcher. +// +// This method is not safe to be called concurrently. +// +// It is designed to be called serially, either from a serialized resource +// callback or by the dependency manager's Close(), which drains all callbacks +// before calling. func (r *routeConfigWatcher) stop() { r.isCancelled = true r.cancel() diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager.go b/internal/xds/xdsdependencymanager/xds_dependency_manager.go index 8607ab3fcfc7..7c0bad645f80 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager.go @@ -59,6 +59,11 @@ type DependencyManager struct { serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc + logger *grpclog.PrefixLogger + + // All the fields below are accessed only from within the context of + // serialized callbacks. + // dataplaneAuthority is the authority used for the data plane connections, // which is also used to select the VirtualHost within the xDS // RouteConfiguration. This is %-encoded to match with VirtualHost Domain @@ -70,11 +75,9 @@ type DependencyManager struct { currentListenerUpdate *xdsresource.ListenerUpdate rdsResourceName string - currentRouteConfig xdsresource.RouteConfigUpdate + currentRouteConfig *xdsresource.RouteConfigUpdate routeConfigWatcher *routeConfigWatcher currentVirtualHost *xdsresource.VirtualHost - - logger *grpclog.PrefixLogger } // New creates a new DependencyManager. @@ -141,7 +144,7 @@ func (m *DependencyManager) applyRouteConfigUpdate(update xdsresource.RouteConfi m.watcher.OnError(fmt.Errorf("could not find VirtualHost for %q", m.dataplaneAuthority)) return } - m.currentRouteConfig = update + m.currentRouteConfig = &update m.currentVirtualHost = matchVH m.maybeSendUpdate() } diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go index b55d642daa99..b5f094671bb7 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager_test.go @@ -73,7 +73,7 @@ var ( Listener: &xdsresource.ListenerUpdate{ RouteConfigName: defaultTestRouteConfigName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}}, - RouteConfig: xdsresource.RouteConfigUpdate{ + RouteConfig: &xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{defaultTestServiceName}, @@ -279,7 +279,7 @@ func (s) TestInlineRouteConfig(t *testing.T) { }, }, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}}, - RouteConfig: xdsresource.RouteConfigUpdate{ + RouteConfig: &xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{defaultTestServiceName}, From 45742dce9aa9b1bd24618fc3a83f9da83926c306 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Wed, 29 Oct 2025 15:57:39 +0530 Subject: [PATCH 11/11] comments --- internal/xds/xdsclient/xdsresource/xdsconfig.go | 6 +++--- internal/xds/xdsdependencymanager/watch_service.go | 6 +++--- .../xdsdependencymanager/xds_dependency_manager.go | 11 +++++++++-- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/internal/xds/xdsclient/xdsresource/xdsconfig.go b/internal/xds/xdsclient/xdsresource/xdsconfig.go index cf712addd3d6..7c2e00a29b54 100644 --- a/internal/xds/xdsclient/xdsresource/xdsconfig.go +++ b/internal/xds/xdsclient/xdsresource/xdsconfig.go @@ -22,18 +22,18 @@ import "google.golang.org/grpc/resolver" // XDSConfig holds the complete gRPC client-side xDS configuration containing // all necessary resources. type XDSConfig struct { - // Listener holds the listener configuration. It is garunteed to be + // Listener holds the listener configuration. It is guaranteed to be // non-nil. Listener *ListenerUpdate // RouteConfig is the route configuration. It will be populated even if - // RouteConfig is inlined into the Listener resource. It is garunteed to be + // RouteConfig is inlined into the Listener resource. It is guaranteed to be // non-nil. RouteConfig *RouteConfigUpdate // VirtualHost selected from the route configuration whose domain field // offers the best match against the provided dataplane authority. It is - // garunteed to be non-nil. + // guaranteed to be non-nil. VirtualHost *VirtualHost // Clusters is a map from cluster name to its configuration. diff --git a/internal/xds/xdsdependencymanager/watch_service.go b/internal/xds/xdsdependencymanager/watch_service.go index 130ccd7e3b1b..b26feec18f11 100644 --- a/internal/xds/xdsdependencymanager/watch_service.go +++ b/internal/xds/xdsdependencymanager/watch_service.go @@ -73,9 +73,9 @@ func (l *listenerWatcher) AmbientError(err error, onDone func()) { // Stops the listenerWatcher. // -// This method is not safe to be called concurrently. This method is guaranteed not to be called concurrently with -// resource callbacks. The dependency manager's Close() ensures all callbacks -// are drained before calling stop. +// This method is not safe to be called concurrently. It is currently designed +// to only be called in the dependency manager's Close() that ensures all +// callbacks are drained before calling stop. func (l *listenerWatcher) stop() { l.isCancelled = true l.cancel() diff --git a/internal/xds/xdsdependencymanager/xds_dependency_manager.go b/internal/xds/xdsdependencymanager/xds_dependency_manager.go index 7c0bad645f80..c84bde640e4b 100644 --- a/internal/xds/xdsdependencymanager/xds_dependency_manager.go +++ b/internal/xds/xdsdependencymanager/xds_dependency_manager.go @@ -82,8 +82,12 @@ type DependencyManager struct { // New creates a new DependencyManager. // -// listenerName is the name of the Listener resource to request from the management server. -// dataplaneAuthority is used to select the best matching virtual host from the route configuration received from the management server. +// - listenerName is the name of the Listener resource to request from the +// management server. +// - dataplaneAuthority is used to select the best matching virtual host from +// the route configuration received from the management server. +// - xdsClient is the xDS client to use to register resource watches. +// - watcher is the ConfigWatcher interface that will receive the aggregated XDSConfig. // // Configuration updates and/or errors are delivered to the watcher. func New(listenerName, dataplaneAuthority string, xdsClient xdsclient.XDSClient, watcher ConfigWatcher) *DependencyManager { @@ -130,6 +134,9 @@ func (m *DependencyManager) annotateErrorWithNodeID(err error) error { return fmt.Errorf("[xDS node id: %v]: %w", nodeID, err) } +// maybeSendUpdate checks that all the resources have been received and sends +// the current aggregated xDS configuration to the watcher if all the updates +// are available. func (m *DependencyManager) maybeSendUpdate() { m.watcher.OnUpdate(&xdsresource.XDSConfig{ Listener: m.currentListenerUpdate,