Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
Improve GCE resources refresh mechanism.
Browse files Browse the repository at this point in the history
Couple of things in this CL:

= Keep per-zone/per-region cache. This will allow us to make progress even if API call for one zone or region fails.

= Shuffle zones/regions list in each refresh cycle, so that we go through them in different order every time. This may increase our chances to make progress in case of consistent API failures.

PiperOrigin-RevId: 334255401
  • Loading branch information
manugarg committed Sep 29, 2020
1 parent afddefa commit 2afc352
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 112 deletions.
121 changes: 75 additions & 46 deletions rds/gcp/forwarding_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ type forwardingRulesLister struct {
thisInstance string
l *logger.Logger

mu sync.RWMutex // Mutex for names and cache
names []string
cache map[string]*frData
computeSvc *compute.Service
mu sync.RWMutex
namesPerScope map[string][]string // "us-central1": ["fr1", "fr2"]
cachePerScope map[string]map[string]*frData // "us-central1": {"fr1": data}
computeSvc *compute.Service
}

// listResources returns the list of resource records, where each record
Expand All @@ -86,67 +86,95 @@ func (frl *forwardingRulesLister) listResources(req *pb.ListResourcesRequest) ([
frl.mu.RLock()
defer frl.mu.RUnlock()

for _, name := range frl.names {
if nameFilter != nil && !nameFilter.Match(name, frl.l) {
continue
}
for region, names := range frl.namesPerScope {
cache := frl.cachePerScope[region]

if regionFilter != nil && !regionFilter.Match(frl.cache[name].region, frl.l) {
continue
}
for _, name := range names {
fr := cache[name]

resources = append(resources, &pb.Resource{
Name: proto.String(name),
Ip: proto.String(frl.cache[name].ip),
})
if fr == nil {
frl.l.Errorf("forwarding_rules: cached info missing for %s", name)
continue
}

if nameFilter != nil && !nameFilter.Match(name, frl.l) {
continue
}

if regionFilter != nil && !regionFilter.Match(cache[name].region, frl.l) {
continue
}

resources = append(resources, &pb.Resource{
Name: proto.String(name),
Ip: proto.String(cache[name].ip),
})
}
}

frl.l.Infof("forwarding_rules.listResources: returning %d forwarding rules", len(resources))
return resources, nil
}

func (frl *forwardingRulesLister) expandForRegion(region string) ([]string, map[string]*frData, error) {
var (
names []string
cache = make(map[string]*frData)
)

frList, err := frl.computeSvc.ForwardingRules.List(frl.project, region).Do()
if err != nil {
return nil, nil, err
}
for _, item := range frList.Items {
cache[item.Name] = &frData{
ip: item.IPAddress,
region: region,
}
names = append(names, item.Name)
}

return names, cache, nil
}

// expand runs equivalent API calls as "gcloud compute instances list",
// and is what is used to populate the cache.
func (frl *forwardingRulesLister) expand(reEvalInterval time.Duration) {
frl.l.Debugf("forwarding_rules.expand: running for project: %s", frl.project)
frl.l.Debugf("forwarding_rules.expand: running for the project: %s", frl.project)

regionList, err := frl.computeSvc.Regions.List(frl.project).Filter(frl.c.GetRegionFilter()).Do()
if err != nil {
frl.l.Errorf("forwarding_rules.expand: error while getting list of all regions: %v", err)
return
}

// Temporary cache and names list.
var (
names []string
cache = make(map[string]*frData)
)
// Shuffle the regions list to change the order in each cycle.
rl := regionList.Items
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(rl), func(i, j int) { rl[i], rl[j] = rl[j], rl[i] })

frl.l.Infof("forwarding_rules.expand: expanding GCE targets for %d regions", len(rl))

sleepBetweenRegions := reEvalInterval / (2 * time.Duration(len(regionList.Items)+1))
for _, region := range regionList.Items {
frList, err := frl.computeSvc.ForwardingRules.List(frl.project, region.Name).Do()
var numItems int

sleepBetweenRegions := reEvalInterval / (2 * time.Duration(len(rl)+1))
for _, region := range rl {
names, cache, err := frl.expandForRegion(region.Name)
if err != nil {
frl.l.Errorf("forwarding_rules.expand: error while getting list of forwarding rules for region (%s): %v", region.Name, err)
return
}
for _, item := range frList.Items {
cache[item.Name] = &frData{
ip: item.IPAddress,
region: region.Name,
}
names = append(names, item.Name)
frl.l.Errorf("forwarding_rules.expand: error while listing forwarding rules in region (%s): %v", region.Name, err)
continue
}

frl.mu.Lock()
frl.cachePerScope[region.Name] = cache
frl.namesPerScope[region.Name] = names
frl.mu.Unlock()

numItems += len(names)
time.Sleep(sleepBetweenRegions)
}

// Note that we update the list of names only if after all regions have been
// expanded successfully. This is to avoid replacing current list with a
// partial expansion of targets.
frl.l.Infof("forwarding_rules.expand: got %d forwarding rules", len(names))
frl.mu.Lock()
frl.cache = cache
frl.names = names
frl.mu.Unlock()
frl.l.Infof("forwarding_rules.expand: got %d forwarding rules", numItems)
}

func newForwardingRulesLister(project, apiVersion string, c *configpb.ForwardingRules, l *logger.Logger) (*forwardingRulesLister, error) {
Expand All @@ -156,11 +184,12 @@ func newForwardingRulesLister(project, apiVersion string, c *configpb.Forwarding
}

frl := &forwardingRulesLister{
project: project,
c: c,
cache: make(map[string]*frData),
computeSvc: cs,
l: l,
project: project,
c: c,
cachePerScope: make(map[string]map[string]*frData),
namesPerScope: make(map[string][]string),
computeSvc: cs,
l: l,
}

reEvalInterval := time.Duration(c.GetReEvalSec()) * time.Second
Expand Down
148 changes: 89 additions & 59 deletions rds/gcp/gce_instances.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2018 Google Inc.
// Copyright 2017-2020 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,10 +70,10 @@ type gceInstancesLister struct {
thisInstance string
l *logger.Logger

mu sync.RWMutex // Mutex for names and cache
names []string
cache map[string]*instanceData
computeSvc *compute.Service
mu sync.RWMutex
namesPerScope map[string][]string // "us-e1-b": ["i1", i2"]
cachePerScope map[string]map[string]*instanceData // "us-e1-b": {"i1: data}
computeSvc *compute.Service
}

func instanceIP(nis []*compute.NetworkInterface, ipConfig *pb.IPConfig) (string, error) {
Expand Down Expand Up @@ -135,31 +135,41 @@ func (il *gceInstancesLister) listResources(req *pb.ListResourcesRequest) ([]*pb
il.mu.RLock()
defer il.mu.RUnlock()

for _, name := range il.names {
if nameFilter != nil && !nameFilter.Match(name, il.l) {
continue
}
if labelsFilter != nil && !labelsFilter.Match(il.cache[name].labels, il.l) {
continue
}
for zone, names := range il.namesPerScope {
cache := il.cachePerScope[zone]

nis := il.cache[name].nis
ip, err := instanceIP(nis, req.GetIpConfig())
if err != nil {
return nil, fmt.Errorf("gce_instances (instance %s): error while getting IP - %v", name, err)
}
for _, name := range names {
ins := cache[name]
if ins == nil {
il.l.Errorf("gce_instances: cached info missing for %s", name)
continue
}

if nameFilter != nil && !nameFilter.Match(name, il.l) {
continue
}
if labelsFilter != nil && !labelsFilter.Match(ins.labels, il.l) {
continue
}

nis := ins.nis
ip, err := instanceIP(nis, req.GetIpConfig())
if err != nil {
return nil, fmt.Errorf("gce_instances (instance %s): error while getting IP - %v", name, err)
}

resources = append(resources, &pb.Resource{
Name: proto.String(name),
Ip: proto.String(ip),
Labels: il.cache[name].labels,
// TODO(manugarg): Add support for returning instance id as well. I want to
// implement feature parity with the current targets first and then add
// more features.
})
resources = append(resources, &pb.Resource{
Name: proto.String(name),
Ip: proto.String(ip),
Labels: ins.labels,
// TODO(manugarg): Add support for returning instance id as well. I want to
// implement feature parity with the current targets first and then add
// more features.
})
}
}

il.l.Infof("gce_instances.listResources: returning %d instance", len(resources))
il.l.Infof("gce_instances.listResources: returning %d instances", len(resources))
return resources, nil
}

Expand All @@ -179,48 +189,67 @@ func defaultComputeService(apiVersion string) (*compute.Service, error) {
return cs, nil
}

func (il *gceInstancesLister) expandForZone(zone string) ([]string, map[string]*instanceData, error) {
var (
names []string
cache = make(map[string]*instanceData)
)

instanceList, err := il.computeSvc.Instances.List(il.project, zone).
Filter("status eq \"RUNNING\"").Do()
if err != nil {
return nil, nil, err
}
for _, item := range instanceList.Items {
if item.Name == il.thisInstance {
continue
}
cache[item.Name] = &instanceData{item.NetworkInterfaces, item.Labels}
names = append(names, item.Name)
}

return names, cache, nil
}

// expand runs equivalent API calls as "gcloud compute instances list",
// and is what is used to populate the cache.
func (il *gceInstancesLister) expand(reEvalInterval time.Duration) {
il.l.Infof("gce_instances.expand: expanding GCE targets for project: %s", il.project)
il.l.Infof("gce_instances.expand: running for the project: %s", il.project)

zonesList, err := il.computeSvc.Zones.List(il.project).Filter(il.c.GetZoneFilter()).Do()
if err != nil {
il.l.Errorf("gce_instances.expand: error while getting list of all zones: %v", err)
return
}

// Temporary cache and names list.
var (
names []string
cache = make(map[string]*instanceData)
)
sleepBetweenZones := reEvalInterval / (2 * time.Duration(len(zonesList.Items)+1))
for _, zone := range zonesList.Items {
instanceList, err := il.computeSvc.Instances.List(il.project, zone.Name).Filter("status eq \"RUNNING\"").Do()
// Shuffle the zones list to change the order in each cycle.
zl := zonesList.Items
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(zl), func(i, j int) { zl[i], zl[j] = zl[j], zl[i] })

il.l.Infof("gce_instances.expand: expanding GCE targets for %d zones", len(zl))

var numItems int

sleepBetweenZones := reEvalInterval / (2 * time.Duration(len(zl)+1))

for _, zone := range zl {
names, cache, err := il.expandForZone(zone.Name)
if err != nil {
il.l.Errorf("gce_instances.expand: error while getting list of all instances: %v", err)
return
}
for _, item := range instanceList.Items {
if item.Name == il.thisInstance {
continue
}
cache[item.Name] = &instanceData{item.NetworkInterfaces, item.Labels}
names = append(names, item.Name)
il.l.Errorf("gce_instances.expand: error while listing instances in zone %s: %v", zone.Name, err)
continue
}

il.mu.Lock()
il.namesPerScope[zone.Name] = names
il.cachePerScope[zone.Name] = cache
il.mu.Unlock()

numItems += len(names)
time.Sleep(sleepBetweenZones)
}

// Note that we update the list of names only if after all zones have been
// expanded successfully. This is to avoid replacing current list with a
// partial expansion of targets. This is in contrast with instance-toNetInf
// cache, which is updated as we go through the instance list.
il.l.Infof("gce_instances.expand: got %d instances", len(names))
il.mu.Lock()
il.cache = cache
il.names = names
il.mu.Unlock()
il.l.Infof("gce_instances.expand: got %d instances", numItems)
}

func newGCEInstancesLister(project, apiVersion string, c *configpb.GCEInstances, l *logger.Logger) (*gceInstancesLister, error) {
Expand All @@ -240,12 +269,13 @@ func newGCEInstancesLister(project, apiVersion string, c *configpb.GCEInstances,
}

il := &gceInstancesLister{
project: project,
c: c,
thisInstance: thisInstance,
cache: make(map[string]*instanceData),
computeSvc: cs,
l: l,
project: project,
c: c,
thisInstance: thisInstance,
cachePerScope: make(map[string]map[string]*instanceData),
namesPerScope: make(map[string][]string),
computeSvc: cs,
l: l,
}

reEvalInterval := time.Duration(c.GetReEvalSec()) * time.Second
Expand Down
Loading

0 comments on commit 2afc352

Please sign in to comment.