From 3786ae1778f5bfcc49da81cbd925b7175ec81839 Mon Sep 17 00:00:00 2001 From: Zach Reyes <39203661+zasweq@users.noreply.github.com> Date: Mon, 6 Dec 2021 13:41:14 -0500 Subject: [PATCH] xds/resolver: Add support for cluster specifier plugins (#4987) * xds/resolver: Add support for cluster specifier plugins --- .../resolver/cluster_specifier_plugin_test.go | 368 ++++++++++++++++++ xds/internal/resolver/serviceconfig.go | 60 ++- xds/internal/resolver/watch_service.go | 11 +- xds/internal/resolver/xds_resolver.go | 1 - xds/internal/resolver/xds_resolver_test.go | 32 +- 5 files changed, 433 insertions(+), 39 deletions(-) create mode 100644 xds/internal/resolver/cluster_specifier_plugin_test.go diff --git a/xds/internal/resolver/cluster_specifier_plugin_test.go b/xds/internal/resolver/cluster_specifier_plugin_test.go new file mode 100644 index 000000000000..d432ad3c489d --- /dev/null +++ b/xds/internal/resolver/cluster_specifier_plugin_test.go @@ -0,0 +1,368 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package resolver + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/internal" + iresolver "google.golang.org/grpc/internal/resolver" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal/balancer/clustermanager" + "google.golang.org/grpc/xds/internal/clusterspecifier" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" +) + +func init() { + balancer.Register(cspB{}) +} + +type cspB struct{} + +func (cspB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return nil +} + +func (cspB) Name() string { + return "csp_experimental" +} + +type cspConfig struct { + ArbitraryField string `json:"arbitrary_field"` +} + +// TestXDSResolverClusterSpecifierPlugin tests that cluster specifier plugins +// produce the correct service config, and that the config selector routes to a +// cluster specifier plugin supported by this service config (i.e. prefixed with +// a cluster specifier plugin prefix). +func (s) TestXDSResolverClusterSpecifierPlugin(t *testing.T) { + xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) + defer xdsR.Close() + defer cancel() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) + + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, + }, + }, + // Top level csp config here - the value of cspA should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}}, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}] + } + } + }}]}` + + wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + } + + cs := iresolver.GetConfigSelector(rState) + if cs == nil { + t.Fatal("received nil config selector") + } + + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) + if err != nil { + t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + } + + cluster := clustermanager.GetPickedClusterForTesting(res.Context) + clusterWant := clusterSpecifierPluginPrefix + "cspA" + if cluster != clusterWant { + t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant) + } +} + +// TestXDSResolverClusterSpecifierPluginConfigUpdate tests that cluster +// specifier plugins produce the correct service config, and that on an update +// to the CSP Configuration, the new config is accounted for in the output +// service config. +func (s) TestXDSResolverClusterSpecifierPluginConfigUpdate(t *testing.T) { + xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) + defer xdsR.Close() + defer cancel() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) + + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, + }, + }, + // Top level csp config here - the value of cspA should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}}, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}] + } + } + }}]}` + + wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + } + + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, + }, + }, + // Top level csp config here - the value of cspA should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "changed"}}}}, + }, nil) + + gotState, err = tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState = gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON = `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"changed"}}] + } + } + }}]}` + + wantSCParsed = internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + } +} + +// TestXDSResolverDelayedOnCommittedCSP tests that cluster specifier plugins and +// their corresponding configurations remain in service config if RPCs are in +// flight. +func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { + xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) + defer xdsR.Close() + defer cancel() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, + }, + }, + // Top level csp config here - the value of cspA should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingA"}}}}, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}] + } + } + }}]}` + + wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + } + + cs := iresolver.GetConfigSelector(rState) + if cs == nil { + t.Fatal("received nil config selector") + } + + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) + if err != nil { + t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + } + + cluster := clustermanager.GetPickedClusterForTesting(res.Context) + clusterWant := clusterSpecifierPluginPrefix + "cspA" + if cluster != clusterWant { + t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant) + } + // delay res.OnCommitted() + + // Perform TWO updates to ensure the old config selector does not hold a reference to cspA + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, + }, + }, + // Top level csp config here - the value of cspB should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, + }, nil) + tcc.stateCh.Receive(ctx) // Ignore the first update. + + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, + }, + }, + // Top level csp config here - the value of cspB should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, + }, nil) + + gotState, err = tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState = gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON2 := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}] + }, + "cluster_specifier_plugin:cspB":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] + } + } + }}]}` + + wantSCParsed2 := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON2) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed2.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed2.Config)) + } + + // Invoke OnCommitted; should lead to a service config update that deletes + // cspA. + res.OnCommitted() + + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, + }, + }, + // Top level csp config here - the value of cspB should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, + }, nil) + gotState, err = tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState = gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON3 := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspB":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] + } + } + }}]}` + + wantSCParsed3 := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON3) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed3.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed3.Config)) + } +} diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 772873092107..fd75af210457 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -44,8 +44,10 @@ import ( ) const ( - cdsName = "cds_experimental" - xdsClusterManagerName = "xds_cluster_manager_experimental" + cdsName = "cds_experimental" + xdsClusterManagerName = "xds_cluster_manager_experimental" + clusterPrefix = "cluster:" + clusterSpecifierPluginPrefix = "cluster_specifier_plugin:" ) type serviceConfig struct { @@ -86,10 +88,8 @@ func (r *xdsResolver) pruneActiveClusters() { func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) { // Generate children (all entries in activeClusters). children := make(map[string]xdsChildConfig) - for cluster := range activeClusters { - children[cluster] = xdsChildConfig{ - ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}), - } + for cluster, ci := range activeClusters { + children[cluster] = ci.cfg } sc := serviceConfig{ @@ -158,10 +158,12 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP if rt == nil || rt.clusters == nil { return nil, errNoMatchedRouteFound } + cluster, ok := rt.clusters.Next().(*routeCluster) if !ok { return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) } + // Add a ref to the selected cluster, as this RPC needs this cluster until // it is committed. ref := &cs.clusters[cluster.name].refCount @@ -353,21 +355,25 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro for i, rt := range su.virtualHost.Routes { clusters := newWRR() - for cluster, wc := range rt.WeightedClusters { + if rt.ClusterSpecifierPlugin != "" { + clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin clusters.Add(&routeCluster{ - name: cluster, - httpFilterConfigOverride: wc.HTTPFilterConfigOverride, - }, int64(wc.Weight)) - - // Initialize entries in cs.clusters map, creating entries in - // r.activeClusters as necessary. Set to zero as they will be - // incremented by incRefs. - ci := r.activeClusters[cluster] - if ci == nil { - ci = &clusterInfo{refCount: 0} - r.activeClusters[cluster] = ci + name: clusterName, + }, 1) + cs.initializeCluster(clusterName, xdsChildConfig{ + ChildPolicy: balancerConfig(su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin]), + }) + } else { + for cluster, wc := range rt.WeightedClusters { + clusterName := clusterPrefix + cluster + clusters.Add(&routeCluster{ + name: clusterName, + httpFilterConfigOverride: wc.HTTPFilterConfigOverride, + }, int64(wc.Weight)) + cs.initializeCluster(clusterName, xdsChildConfig{ + ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}), + }) } - cs.clusters[cluster] = ci } cs.routes[i].clusters = clusters @@ -397,9 +403,25 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro return cs, nil } +// initializeCluster initializes entries in cs.clusters map, creating entries in +// r.activeClusters as necessary. Any created entries will have a ref count set +// to zero as their ref count will be incremented by incRefs. +func (cs *configSelector) initializeCluster(clusterName string, cfg xdsChildConfig) { + ci := cs.r.activeClusters[clusterName] + if ci == nil { + ci = &clusterInfo{refCount: 0} + cs.r.activeClusters[clusterName] = ci + } + cs.clusters[clusterName] = ci + cs.clusters[clusterName].cfg = cfg +} + type clusterInfo struct { // number of references to this cluster; accessed atomically refCount int32 + // cfg is the child configuration for this cluster, containing either the + // csp config or the cds cluster config. + cfg xdsChildConfig } type interceptorList struct { diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 30f65727d08a..3db9be1cac07 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/xds/internal/clusterspecifier" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -35,6 +36,9 @@ import ( type serviceUpdate struct { // virtualHost contains routes and other configuration to route RPCs. virtualHost *xdsresource.VirtualHost + // clusterSpecifierPlugins contains the configurations for any cluster + // specifier plugins emitted by the xdsclient. + clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig // ldsConfig contains configuration that applies to all routes. ldsConfig ldsConfig } @@ -120,7 +124,7 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate, } // Handle the inline RDS update as if it's from an RDS watch. - w.updateVirtualHostsFromRDS(*update.InlineRouteConfig) + w.applyRouteConfigUpdate(*update.InlineRouteConfig) return } @@ -151,7 +155,7 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate, w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp) } -func (w *serviceUpdateWatcher) updateVirtualHostsFromRDS(update xdsresource.RouteConfigUpdate) { +func (w *serviceUpdateWatcher) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) { matchVh := xdsresource.FindBestMatchingVirtualHost(w.serviceName, update.VirtualHosts) if matchVh == nil { // No matching virtual host found. @@ -160,6 +164,7 @@ func (w *serviceUpdateWatcher) updateVirtualHostsFromRDS(update xdsresource.Rout } w.lastUpdate.virtualHost = matchVh + w.lastUpdate.clusterSpecifierPlugins = update.ClusterSpecifierPlugins w.serviceCb(w.lastUpdate, nil) } @@ -179,7 +184,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsresource.RouteConfigUpdat w.serviceCb(serviceUpdate{}, err) return } - w.updateVirtualHostsFromRDS(update) + w.applyRouteConfigUpdate(update) } func (w *serviceUpdateWatcher) close() { diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 2192051ae2f6..6788090e29c0 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -205,7 +205,6 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool { return true } - // Produce the service config. sc, err := serviceConfigJSON(r.activeClusters) if err != nil { // JSON marshal error; should never happen. diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index df4f47803713..c5fa3b8f7493 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -473,12 +473,12 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantJSON: `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ + "cluster:test-cluster-1":{ "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] } } }}]}`, - wantClusters: map[string]bool{"test-cluster-1": true}, + wantClusters: map[string]bool{"cluster:test-cluster-1": true}, }, { routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{ @@ -491,18 +491,18 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantJSON: `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ + "cluster:test-cluster-1":{ "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] }, - "cluster_1":{ + "cluster:cluster_1":{ "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] }, - "cluster_2":{ + "cluster:cluster_2":{ "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] } } }}]}`, - wantClusters: map[string]bool{"cluster_1": true, "cluster_2": true}, + wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, }, { routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{ @@ -515,15 +515,15 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantJSON: `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "cluster_1":{ + "cluster:cluster_1":{ "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] }, - "cluster_2":{ + "cluster:cluster_2":{ "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] } } }}]}`, - wantClusters: map[string]bool{"cluster_1": true, "cluster_2": true}, + wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, }, } { // Invoke the watchAPI callback with a good service update and wait for the @@ -725,7 +725,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) { wantJSON := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ + "cluster:test-cluster-1":{ "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] } } @@ -857,7 +857,7 @@ func (s) TestXDSResolverWRR(t *testing.T) { picks[clustermanager.GetPickedClusterForTesting(res.Context)]++ res.OnCommitted() } - want := map[string]int{"A": 10, "B": 20} + want := map[string]int{"cluster:A": 10, "cluster:B": 20} if !reflect.DeepEqual(picks, want) { t.Errorf("picked clusters = %v; want %v", picks, want) } @@ -987,7 +987,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { wantJSON := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ + "cluster:test-cluster-1":{ "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] } } @@ -1009,7 +1009,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) } cluster := clustermanager.GetPickedClusterForTesting(res.Context) - if cluster != "test-cluster-1" { + if cluster != "cluster:test-cluster-1" { t.Fatalf("") } // delay res.OnCommitted() @@ -1046,10 +1046,10 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { wantJSON2 := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ + "cluster:test-cluster-1":{ "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] }, - "NEW":{ + "cluster:NEW":{ "childPolicy":[{"cds_experimental":{"cluster":"NEW"}}] } } @@ -1084,7 +1084,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { wantJSON3 := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "NEW":{ + "cluster:NEW":{ "childPolicy":[{"cds_experimental":{"cluster":"NEW"}}] } }