Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
Add RDS-cache support to file provider.
Browse files Browse the repository at this point in the history
If request contains if_modified_since field, use that field to decide whether to send all the resources in the response or not.

We always returns last_modified in the response. This timestamp corresponds to when lister's cache was refreshed from the actual file.

Ref: #641
& #634
PiperOrigin-RevId: 388502695
  • Loading branch information
manugarg committed Aug 3, 2021
1 parent 91d6fee commit 95bb927
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 14 deletions.
66 changes: 57 additions & 9 deletions rds/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/google/cloudprober/rds/server/filter"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
)

// DefaultProviderID is the povider id to use for this provider if a provider
Expand Down Expand Up @@ -70,14 +71,23 @@ type lister struct {
checkModTime bool
}

// ListResources returns the last successfully parsed list of resources.
func (ls *lister) ListResources(req *pb.ListResourcesRequest) (*pb.ListResourcesResponse, error) {
func (ls *lister) lastModified() int64 {
ls.mu.RLock()
defer ls.mu.RUnlock()
return ls.lastUpdated.Unix()
}

// listResources returns the last successfully parsed list of resources.
func (ls *lister) listResources(req *pb.ListResourcesRequest) (*pb.ListResourcesResponse, error) {
ls.mu.RLock()
defer ls.mu.RUnlock()

// If there are no filters, return early.
if len(req.GetFilter()) == 0 {
return &pb.ListResourcesResponse{Resources: append([]*pb.Resource{}, ls.resources...)}, nil
return &pb.ListResourcesResponse{
Resources: append([]*pb.Resource{}, ls.resources...),
LastModified: proto.Int64(ls.lastUpdated.Unix()),
}, nil
}

allFilters, err := filter.ParseFilters(req.GetFilter(), SupportedFilters.RegexFilterKeys, "")
Expand Down Expand Up @@ -105,7 +115,10 @@ func (ls *lister) ListResources(req *pb.ListResourcesRequest) (*pb.ListResources
}

ls.l.Infof("file.ListResources: returning %d resources out of %d", len(resources), len(ls.resources))
return &pb.ListResourcesResponse{Resources: resources}, nil
return &pb.ListResourcesResponse{
Resources: resources,
LastModified: proto.Int64(ls.lastUpdated.Unix()),
}, nil
}

func (ls *lister) parseFileContent(b []byte) ([]*pb.Resource, error) {
Expand Down Expand Up @@ -223,6 +236,20 @@ func newLister(filePath string, c *configpb.ProviderConfig, l *logger.Logger) (*
return ls, nil
}

func responseWithCacheCheck(ls *lister, req *pb.ListResourcesRequest) (*pb.ListResourcesResponse, error) {
if req.GetIfModifiedSince() == 0 {
return ls.listResources(req)
}

if lastModified := ls.lastModified(); lastModified <= req.GetIfModifiedSince() {
return &pb.ListResourcesResponse{
LastModified: proto.Int64(lastModified),
}, nil
}

return ls.listResources(req)
}

// ListResources returns the list of resources based on the given request.
func (p *Provider) ListResources(req *pb.ListResourcesRequest) (*pb.ListResourcesResponse, error) {
fPath := req.GetResourcePath()
Expand All @@ -231,27 +258,48 @@ func (p *Provider) ListResources(req *pb.ListResourcesRequest) (*pb.ListResource
if ls == nil {
return nil, fmt.Errorf("file path %s is not available on this server", fPath)
}
return ls.ListResources(req)
return responseWithCacheCheck(ls, req)
}

// Avoid append and another allocation if there is only one lister, most
// common use case.
if len(p.listers) == 1 {
for _, ls := range p.listers {
return ls.ListResources(req)
return responseWithCacheCheck(ls, req)
}
}

// If we are working with multiple listers, it's slightly more complicated.
// In that case we need to return all the listers' resources even if only one
// of them has changed.
//
// Get the latest last-modified.
lastModified := int64(0)
for _, ls := range p.listers {
listerLastModified := ls.lastModified()
if lastModified < listerLastModified {
lastModified = listerLastModified
}
}
resp := &pb.ListResourcesResponse{
LastModified: proto.Int64(lastModified),
}

// if nothing changed since req.IfModifiedSince, return early.
if req.GetIfModifiedSince() != 0 && lastModified <= req.GetIfModifiedSince() {
return resp, nil
}

var result []*pb.Resource
for _, fp := range p.filePaths {
res, err := p.listers[fp].ListResources(req)
res, err := p.listers[fp].listResources(req)
if err != nil {
return nil, err
}
result = append(result, res.Resources...)
}

return &pb.ListResourcesResponse{Resources: result}, nil
resp.Resources = result
return resp, nil
}

// Provider provides a file-based targets provider for RDS. It implements the
Expand Down
130 changes: 125 additions & 5 deletions rds/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func BenchmarkListResources(b *testing.B) {
b.StartTimer()

for j := 0; j < b.N; j++ {
res, err := ls.ListResources(&rdspb.ListResourcesRequest{
res, err := ls.listResources(&rdspb.ListResourcesRequest{
Filter: filters,
})

Expand Down Expand Up @@ -192,7 +192,7 @@ func testModTimeCheckBehavior(t *testing.T, disableModTimeCheck bool) {
}

// Step 1: Very first run. File should be loaded.
res, err := ls.ListResources(nil)
res, err := ls.listResources(nil)
if err != nil {
t.Errorf("Unexxpected error: %v", err)
}
Expand All @@ -216,10 +216,11 @@ func testModTimeCheckBehavior(t *testing.T, disableModTimeCheck bool) {
t.Errorf("File unexpectly didn't reload. Update time: %v, last update time: %v", ls.lastUpdated, firstUpdateTime)
}
}
res, err = ls.ListResources(nil)
res, err = ls.listResources(nil)
if err != nil {
t.Errorf("Unexxpected error: %v", err)
t.Errorf("Unexpected error: %v", err)
}
wantResources.LastModified = proto.Int64(ls.lastModified())
if !proto.Equal(res, wantResources) {
t.Errorf("Got resources:\n%s\nWant resources:\n%s", res.String(), wantResources.String())
}
Expand All @@ -236,10 +237,11 @@ func testModTimeCheckBehavior(t *testing.T, disableModTimeCheck bool) {
if ls.lastUpdated.Before(fileModTime) {
t.Errorf("File lister last update time (%v) before file mod time (%v)", ls.lastUpdated, fileModTime)
}
res, err = ls.ListResources(nil)
res, err = ls.listResources(nil)
if err != nil {
t.Errorf("Unexxpected error: %v", err)
}
wantResources.LastModified = proto.Int64(ls.lastModified())
if !proto.Equal(res, wantResources) {
t.Errorf("Got resources:\n%s\nWant resources:\n%s", res.String(), wantResources.String())
}
Expand All @@ -254,3 +256,121 @@ func TestModTimeCheckBehavior(t *testing.T) {
testModTimeCheckBehavior(t, true)
})
}

func TestListResourcesWithCache(t *testing.T) {
// We test with a provider that contains two listers (created from textpb
// files above). We try accessing single lister (by setting resource path)
// and both listers.
tests := []struct {
desc string
filePaths [2]string // Lister's file paths.
listerLastModified [2]int64 // Last modified timestamp for listers.
ifModifiedSince int64 // Request's if_modified_since
resourcePath string // Request's resource path
wantResponse *rdspb.ListResourcesResponse
}{
{
desc: "no-caching-all-resources",
filePaths: [2]string{"testdata/targets1.textpb", "testdata/targets2.textpb"},
wantResponse: &rdspb.ListResourcesResponse{
Resources: testExpectedResources,
LastModified: proto.Int64(0),
},
},
{
desc: "non-zero-last-modified,return-all-resources",
filePaths: [2]string{"testdata/targets1.textpb", "testdata/targets2.textpb"},
listerLastModified: [2]int64{300, 314},
wantResponse: &rdspb.ListResourcesResponse{
Resources: testExpectedResources,
LastModified: proto.Int64(314),
},
},
{
desc: "if-modified-since-older-1,return-all-resources",
filePaths: [2]string{"testdata/targets1.textpb", "testdata/targets2.textpb"},
listerLastModified: [2]int64{300, 314},
ifModifiedSince: 300,
wantResponse: &rdspb.ListResourcesResponse{
Resources: testExpectedResources,
LastModified: proto.Int64(314),
},
},
{
desc: "if-modified-since-older-2,return-all-resources",
filePaths: [2]string{"testdata/targets1.textpb", "testdata/targets2.textpb"},
listerLastModified: [2]int64{300, 314},
ifModifiedSince: 302,
wantResponse: &rdspb.ListResourcesResponse{
Resources: testExpectedResources,
LastModified: proto.Int64(314),
},
},
{
desc: "one-resource-path-1st-file,cached",
filePaths: [2]string{"testdata/targets1.textpb", "testdata/targets2.textpb"},
listerLastModified: [2]int64{300, 314},
ifModifiedSince: 300,
resourcePath: "testdata/targets1.textpb",
wantResponse: &rdspb.ListResourcesResponse{
LastModified: proto.Int64(300),
},
},
{
desc: "one-resource-path-2nd-file,uncached",
filePaths: [2]string{"testdata/targets1.textpb", "testdata/targets2.textpb"},
listerLastModified: [2]int64{300, 314},
ifModifiedSince: 300,
resourcePath: "testdata/targets2.textpb",
wantResponse: &rdspb.ListResourcesResponse{
Resources: testExpectedResources[2:],
LastModified: proto.Int64(314),
},
},
{
desc: "if-modified-since-equal-no-resources",
ifModifiedSince: 314,
listerLastModified: [2]int64{300, 314},
wantResponse: &rdspb.ListResourcesResponse{
LastModified: proto.Int64(314),
},
},
{
desc: "if-modified-since-bigger-no-resources",
ifModifiedSince: 315,
listerLastModified: [2]int64{300, 314},
wantResponse: &rdspb.ListResourcesResponse{
LastModified: proto.Int64(314),
},
},
}

for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
p := &Provider{
filePaths: test.filePaths[:],
listers: make(map[string]*lister),
}

for i, fp := range test.filePaths {
ls, _ := newLister(fp, &configpb.ProviderConfig{}, nil)
ls.lastUpdated = time.Unix(test.listerLastModified[i], 0)
p.listers[fp] = ls
}

resp, err := p.ListResources(&rdspb.ListResourcesRequest{
ResourcePath: proto.String(test.resourcePath),
IfModifiedSince: proto.Int64(test.ifModifiedSince),
})

if err != nil {
t.Errorf("Got unexpected error: %v", err)
return
}

if !proto.Equal(resp, test.wantResponse) {
t.Errorf("Got response:\n%s\nwanted:\n%s", resp.String(), test.wantResponse.String())
}
})
}
}

0 comments on commit 95bb927

Please sign in to comment.