Skip to content

Commit

Permalink
Consistently lock and copy ads clients (istio#28968)
Browse files Browse the repository at this point in the history
* Consistently lock and copy ads clients

fixes istio#28958

* fix wrong reference

* optimize

* fix race
  • Loading branch information
howardjohn authored and Steven Landow committed Jan 5, 2021
1 parent 9aa291c commit b422b6a
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 70 deletions.
16 changes: 2 additions & 14 deletions pilot/pkg/xds/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,15 +636,12 @@ func (s *DiscoveryServer) adsClientCount() int {
func (s *DiscoveryServer) ProxyUpdate(clusterID, ip string) {
var connection *Connection

s.adsClientsMutex.RLock()
for _, v := range s.adsClients {
for _, v := range s.Clients() {
if v.proxy.Metadata.ClusterID == clusterID && v.proxy.IPAddresses[0] == ip {
connection = v
break
}

}
s.adsClientsMutex.RUnlock()

// It is possible that the envoy has not connected to this pilot, maybe connected to another pilot
if connection == nil {
Expand Down Expand Up @@ -705,17 +702,8 @@ func (s *DiscoveryServer) AdsPushAll(version string, req *model.PushRequest) {

// Send a signal to all connections, with a push event.
func (s *DiscoveryServer) startPush(req *model.PushRequest) {

// Push config changes, iterating over connected envoys. This cover ADS and EDS(0.7), both share
// the same connection table
s.adsClientsMutex.RLock()

// Create a temp map to avoid locking the add/remove
pending := make([]*Connection, 0, len(s.adsClients))
for _, v := range s.adsClients {
pending = append(pending, v)
}
s.adsClientsMutex.RUnlock()

if adsLog.DebugEnabled() {
currentlyPending := s.pushQueue.Pending()
Expand All @@ -724,7 +712,7 @@ func (s *DiscoveryServer) startPush(req *model.PushRequest) {
}
}
req.Start = time.Now()
for _, p := range pending {
for _, p := range s.Clients() {
s.pushQueue.Enqueue(p, req)
}
}
Expand Down
31 changes: 8 additions & 23 deletions pilot/pkg/xds/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ func (s *DiscoveryServer) addDebugHandler(mux *http.ServeMux, path string, help
// Syncz dumps the synchronization status of all Envoys connected to this Pilot instance
func (s *DiscoveryServer) Syncz(w http.ResponseWriter, _ *http.Request) {
syncz := make([]SyncStatus, 0)
s.adsClientsMutex.RLock()
for _, con := range s.adsClients {
for _, con := range s.Clients() {
node := con.proxy
if node != nil {
syncz = append(syncz, SyncStatus{
Expand All @@ -210,7 +209,6 @@ func (s *DiscoveryServer) Syncz(w http.ResponseWriter, _ *http.Request) {
})
}
}
s.adsClientsMutex.RUnlock()
out, err := json.MarshalIndent(&syncz, "", " ")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
Expand Down Expand Up @@ -318,8 +316,7 @@ func (s *DiscoveryServer) distributedVersions(w http.ResponseWriter, req *http.R
proxyNamespace := req.URL.Query().Get("proxy_namespace")
knownVersions := make(map[string]string)
var results []SyncedVersions
s.adsClientsMutex.RLock()
for _, con := range s.adsClients {
for _, con := range s.Clients() {
// wrap this in independent scope so that panic's don't bypass Unlock...
con.proxy.RLock()

Expand All @@ -337,7 +334,6 @@ func (s *DiscoveryServer) distributedVersions(w http.ResponseWriter, req *http.R
}
con.proxy.RUnlock()
}
s.adsClientsMutex.RUnlock()

out, err := json.MarshalIndent(&results, "", " ")
if err != nil {
Expand Down Expand Up @@ -448,18 +444,12 @@ func (s *DiscoveryServer) adsz(w http.ResponseWriter, req *http.Request) {
w.Header().Add("Content-Type", "application/json")
if req.Form.Get("push") != "" {
AdsPushAll(s)
s.adsClientsMutex.RLock()
_, _ = fmt.Fprintf(w, "Pushed to %d servers", len(s.adsClients))
s.adsClientsMutex.RUnlock()
_, _ = fmt.Fprintf(w, "Pushed to %d servers", s.adsClientCount())
return
}

s.adsClientsMutex.RLock()
clients := s.adsClients
s.adsClientsMutex.RUnlock()

adsClients := &AdsClients{}
for _, c := range clients {
for _, c := range s.Clients() {
adsClient := AdsClient{
ConnectionID: c.ConID,
ConnectedAt: c.Connect,
Expand Down Expand Up @@ -770,12 +760,9 @@ func (s *DiscoveryServer) ForceDisconnect(w http.ResponseWriter, req *http.Reque
}

func (s *DiscoveryServer) getProxyConnection(proxyID string) *Connection {
s.adsClientsMutex.RLock()
defer s.adsClientsMutex.RUnlock()

for conID := range s.adsClients {
if strings.Contains(conID, proxyID) {
return s.adsClients[conID]
for _, con := range s.Clients() {
if strings.Contains(con.ConID, proxyID) {
return con
}
}

Expand All @@ -787,15 +774,13 @@ func (s *DiscoveryServer) instancesz(w http.ResponseWriter, req *http.Request) {
w.Header().Add("Content-Type", "application/json")

instances := map[string][]*model.ServiceInstance{}
s.adsClientsMutex.RLock()
for _, con := range s.adsClients {
for _, con := range s.Clients() {
con.proxy.RLock()
if con.proxy != nil {
instances[con.proxy.ID] = con.proxy.ServiceInstances
}
con.proxy.RUnlock()
}
s.adsClientsMutex.RUnlock()
by, _ := json.MarshalIndent(instances, "", " ")

_, _ = w.Write(by)
Expand Down
12 changes: 12 additions & 0 deletions pilot/pkg/xds/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,3 +501,15 @@ func (s *DiscoveryServer) initGenerators() {
func (s *DiscoveryServer) Shutdown() {
s.pushQueue.ShutDown()
}

// Clients returns all currently connected clients. This method can be safely called concurrently, but care
// should be taken with the underlying objects (ie model.Proxy) to ensure proper locking.
func (s *DiscoveryServer) Clients() []*Connection {
s.adsClientsMutex.RLock()
defer s.adsClientsMutex.RUnlock()
clients := make([]*Connection, 0, len(s.adsClients))
for _, con := range s.adsClients {
clients = append(clients, con)
}
return clients
}
36 changes: 3 additions & 33 deletions pilot/pkg/xds/internalgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
status "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/any"
structpb "github.com/golang/protobuf/ptypes/struct"
"golang.org/x/time/rate"

"istio.io/istio/pilot/pkg/features"
Expand Down Expand Up @@ -75,35 +74,13 @@ func NewInternalGen(s *DiscoveryServer) *InternalGen {
}

func (sg *InternalGen) OnConnect(con *Connection) {
if con.node.Metadata != nil && con.node.Metadata.Fields != nil {
con.node.Metadata.Fields["istiod"] = &structpb.Value{
Kind: &structpb.Value_StringValue{
StringValue: "TODO", // TODO: fill in the Istiod address - may include network, cluster, IP
},
}
con.node.Metadata.Fields["con"] = &structpb.Value{
Kind: &structpb.Value_StringValue{
StringValue: con.ConID,
},
}
}
sg.startPush(TypeURLConnections, []proto.Message{con.node})
}

func (sg *InternalGen) OnDisconnect(con *Connection) {
sg.QueueUnregisterWorkload(con.proxy)

sg.startPush(TypeURLDisconnect, []proto.Message{con.node})

if con.node.Metadata != nil && con.node.Metadata.Fields != nil {
con.node.Metadata.Fields["istiod"] = &structpb.Value{
Kind: &structpb.Value_StringValue{
StringValue: "", // TODO: using empty string to indicate this node has no istiod connection. We'll iterate.
},
}
}

// Note that it is quite possible for a 'connect' on a different istiod to happen before a disconnect.
}

func (sg *InternalGen) EnableWorkloadEntryController(store model.ConfigStoreCache) {
Expand Down Expand Up @@ -136,17 +113,15 @@ func (sg *InternalGen) OnNack(node *model.Proxy, dr *discovery.DiscoveryRequest)
func (s *DiscoveryServer) PushAll(res *discovery.DiscoveryResponse) {
// Push config changes, iterating over connected envoys. This cover ADS and EDS(0.7), both share
// the same connection table
s.adsClientsMutex.RLock()
// Create a temp map to avoid locking the add/remove
pending := []*Connection{}
for _, v := range s.adsClients {
for _, v := range s.Clients() {
v.proxy.RLock()
if v.proxy.WatchedResources[res.TypeUrl] != nil {
pending = append(pending, v)
}
v.proxy.RUnlock()
}
s.adsClientsMutex.RUnlock()

// only marshal resources if there are connected clients
if len(pending) == 0 {
Expand Down Expand Up @@ -197,12 +172,9 @@ func (sg *InternalGen) Generate(proxy *model.Proxy, push *model.PushContext, w *

switch w.TypeUrl {
case TypeURLConnections:
sg.Server.adsClientsMutex.RLock()
// Create a temp map to avoid locking the add/remove
for _, v := range sg.Server.adsClients {
for _, v := range sg.Server.Clients() {
res = append(res, util.MessageToAny(v.node))
}
sg.Server.adsClientsMutex.RUnlock()
case TypeDebugSyncronization:
res = sg.debugSyncz()
case TypeDebugConfigDump:
Expand Down Expand Up @@ -239,8 +211,7 @@ func (sg *InternalGen) debugSyncz() []*any.Any {
v3.ClusterType,
}

sg.Server.adsClientsMutex.RLock()
for _, con := range sg.Server.adsClients {
for _, con := range sg.Server.Clients() {
con.proxy.RLock()
// Skip "nodes" without metdata (they are probably istioctl queries!)
if isProxy(con) {
Expand Down Expand Up @@ -274,7 +245,6 @@ func (sg *InternalGen) debugSyncz() []*any.Any {
}
con.proxy.RUnlock()
}
sg.Server.adsClientsMutex.RUnlock()

return res
}
Expand Down

0 comments on commit b422b6a

Please sign in to comment.