Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(backups): Reduce latency of list backups #7435

Merged
merged 4 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 40 additions & 34 deletions worker/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (
"io"
"net/url"
"sort"
"sync"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"golang.org/x/sync/errgroup"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -98,19 +100,6 @@ type UriHandler interface {
ReadManifest(string, *Manifest) error
}

// getHandler returns a UriHandler for the URI scheme.
func getHandler(scheme string, creds *x.MinioCredentials) UriHandler {
switch scheme {
case "file", "":
return &fileHandler{}
case "minio", "s3":
return &s3Handler{
creds: creds,
}
}
return nil
}

// NewUriHandler parses the requested URI and finds the corresponding UriHandler.
// If the passed credentials are not nil, they will be used to override the
// default credentials (only for backups to minio or S3).
Expand All @@ -137,12 +126,14 @@ func getHandler(scheme string, creds *x.MinioCredentials) UriHandler {
// file:///tmp/dgraph/backups
// /tmp/dgraph/backups?compress=gzip
func NewUriHandler(uri *url.URL, creds *x.MinioCredentials) (UriHandler, error) {
h := getHandler(uri.Scheme, creds)
if h == nil {
return nil, errors.Errorf("Unable to handle url: %s", uri)
switch uri.Scheme {
case "file", "":
return &fileHandler{}, nil
case "minio", "s3":
return NewS3Handler(uri, creds)
}
return nil, errors.Errorf("Unable to handle url: %s", uri)

return h, nil
}

// loadFn is a function that will receive the current file being read.
Expand All @@ -160,8 +151,8 @@ func LoadBackup(location, backupId string, backupNum uint64, creds *x.MinioCrede
return LoadResult{Err: err}
}

h := getHandler(uri.Scheme, creds)
if h == nil {
h, err := NewUriHandler(uri, creds)
if err != nil {
return LoadResult{Err: errors.Errorf("Unsupported URI: %v", uri)}
}

Expand All @@ -176,9 +167,9 @@ func VerifyBackup(req *pb.RestoreRequest, creds *x.MinioCredentials, currentGrou
return err
}

h := getHandler(uri.Scheme, creds)
if h == nil {
return errors.Errorf("Unsupported URI: %v", uri)
h, err := NewUriHandler(uri, creds)
if err != nil {
return errors.Wrap(err, "VerifyBackup")
}

return h.Verify(uri, req, currentGroups)
Expand All @@ -191,27 +182,42 @@ func ListBackupManifests(l string, creds *x.MinioCredentials) (map[string]*Manif
return nil, err
}

h := getHandler(uri.Scheme, creds)
if h == nil {
return nil, errors.Errorf("Unsupported URI: %v", uri)
h, err := NewUriHandler(uri, creds)
if err != nil {
return nil, errors.Wrap(err, "ListBackupManifests")
}

paths, err := h.ListManifests(uri)
if err != nil {
return nil, err
}

listedManifests := make(map[string]*Manifest)
for _, path := range paths {
var m Manifest
if err := h.ReadManifest(path, &m); err != nil {
return nil, errors.Wrapf(err, "While reading %q", path)
}
m.Path = path
listedManifests[path] = &m
res := struct {
sync.Mutex
listedManifests map[string]*Manifest
}{
listedManifests: make(map[string]*Manifest),
}

return listedManifests, nil
var g errgroup.Group
for _, path := range paths {
path := path // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
var m Manifest
if err := h.ReadManifest(path, &m); err != nil {
return errors.Wrapf(err, "ReadManifest: path=%q", path)
}
m.Path = path
res.Lock()
res.listedManifests[path] = &m
res.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return res.listedManifests, nil
}

// filterManifests takes a list of manifests and returns the list of manifests
Expand Down
17 changes: 0 additions & 17 deletions worker/backup_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestGetHandler(t *testing.T) {
tests := []struct {
in string
out UriHandler
}{
{in: "file", out: &fileHandler{}},
{in: "minio", out: &s3Handler{}},
{in: "s3", out: &s3Handler{}},
{in: "", out: &fileHandler{}},
{in: "something", out: nil},
}
for _, tc := range tests {
actual := getHandler(tc.in, nil)
require.Equal(t, tc.out, actual)
}
}

func TestFilterManifestDefault(t *testing.T) {
manifests := []*Manifest{
{
Expand Down
77 changes: 21 additions & 56 deletions worker/s3_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,24 @@ type s3Handler struct {
cerr chan error
creds *x.MinioCredentials
uri *url.URL
mc *x.MinioClient
}

// setup creates a new session, checks valid bucket at uri.Path, and configures a minio client.
// setup also fills in values used by the handler in subsequent calls.
// Returns a new S3 minio client, otherwise a nil client with an error.
func (h *s3Handler) setup(uri *url.URL) (*x.MinioClient, error) {
mc, err := x.NewMinioClient(uri, h.creds)
func NewS3Handler(uri *url.URL, creds *x.MinioCredentials) (*s3Handler, error) {
h := &s3Handler{
creds: creds,
uri: uri,
}
mc, err := x.NewMinioClient(uri, creds)
if err != nil {
return nil, err
}

h.bucketName, h.objectPrefix, err = mc.ValidateBucket(uri)

return mc, err
h.mc = mc
h.bucketName, h.objectPrefix = mc.ParseBucketAndPrefix(uri.Path)
return h, nil
}

func (h *s3Handler) createObject(uri *url.URL, req *pb.BackupRequest, mc *x.MinioClient,
Expand All @@ -97,17 +101,12 @@ func (h *s3Handler) createObject(uri *url.URL, req *pb.BackupRequest, mc *x.Mini
// GetLatestManifest reads the manifests at the given URL and returns the
// latest manifest.
func (h *s3Handler) GetLatestManifest(uri *url.URL) (*Manifest, error) {
mc, err := h.setup(uri)
if err != nil {
return nil, err
}

// Find the max Since value from the latest backup.
var lastManifest string
done := make(chan struct{})
defer close(done)
suffix := "/" + backupManifest
for object := range mc.ListObjects(h.bucketName, h.objectPrefix, true, done) {
for object := range h.mc.ListObjects(h.bucketName, h.objectPrefix, true, done) {
if strings.HasSuffix(object.Key, suffix) && object.Key > lastManifest {
lastManifest = object.Key
}
Expand All @@ -118,7 +117,7 @@ func (h *s3Handler) GetLatestManifest(uri *url.URL) (*Manifest, error) {
return &m, nil
}

if err := h.readManifest(mc, lastManifest, &m); err != nil {
if err := h.ReadManifest(lastManifest, &m); err != nil {
return nil, err
}
return &m, nil
Expand All @@ -133,53 +132,28 @@ func (h *s3Handler) GetLatestManifest(uri *url.URL) (*Manifest, error) {
func (h *s3Handler) CreateBackupFile(uri *url.URL, req *pb.BackupRequest) error {
glog.V(2).Infof("S3Handler got uri: %+v. Host: %s. Path: %s\n", uri, uri.Host, uri.Path)

mc, err := h.setup(uri)
if err != nil {
return err
}

objectName := backupName(req.ReadTs, req.GroupId)
h.createObject(uri, req, mc, objectName)
h.createObject(uri, req, h.mc, objectName)
return nil
}

// CreateManifest finishes a backup by creating an object to store the manifest.
func (h *s3Handler) CreateManifest(uri *url.URL, req *pb.BackupRequest) error {
glog.V(2).Infof("S3Handler got uri: %+v. Host: %s. Path: %s\n", uri, uri.Host, uri.Path)

mc, err := h.setup(uri)
if err != nil {
return err
}

h.createObject(uri, req, mc, backupManifest)
h.createObject(uri, req, h.mc, backupManifest)
return nil
}

// readManifest reads a manifest file at path using the handler.
// Returns nil on success, otherwise an error.
func (h *s3Handler) readManifest(mc *x.MinioClient, object string, m *Manifest) error {
reader, err := mc.GetObject(h.bucketName, object, minio.GetObjectOptions{})
if err != nil {
return err
}
defer reader.Close()
return json.NewDecoder(reader).Decode(m)
}

func (h *s3Handler) GetManifests(uri *url.URL, backupId string,
backupNum uint64) ([]*Manifest, error) {
mc, err := h.setup(uri)
if err != nil {
return nil, err
}

var paths []string
doneCh := make(chan struct{})
defer close(doneCh)

suffix := "/" + backupManifest
for object := range mc.ListObjects(h.bucketName, h.objectPrefix, true, doneCh) {
for object := range h.mc.ListObjects(h.bucketName, h.objectPrefix, true, doneCh) {
if strings.HasSuffix(object.Key, suffix) {
paths = append(paths, object.Key)
}
Expand All @@ -194,7 +168,7 @@ func (h *s3Handler) GetManifests(uri *url.URL, backupId string,
var manifests []*Manifest
for _, path := range paths {
var m Manifest
if err := h.readManifest(mc, path, &m); err != nil {
if err := h.ReadManifest(path, &m); err != nil {
return nil, errors.Wrapf(err, "while reading %q", path)
}
m.Path = path
Expand All @@ -213,11 +187,6 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, backupNum uint64, fn loa
return LoadResult{Err: errors.Wrapf(err, "while retrieving manifests")}
}

mc, err := h.setup(uri)
if err != nil {
return LoadResult{Err: err}
}

// since is returned with the max manifest Since value found.
var since uint64

Expand All @@ -233,7 +202,7 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, backupNum uint64, fn loa
path := filepath.Dir(manifests[i].Path)
for gid := range manifest.Groups {
object := filepath.Join(path, backupName(manifest.Since, gid))
reader, err := mc.GetObject(h.bucketName, object, minio.GetObjectOptions{})
reader, err := h.mc.GetObject(h.bucketName, object, minio.GetObjectOptions{})
if err != nil {
return LoadResult{Err: errors.Wrapf(err, "Failed to get %q", object)}
}
Expand Down Expand Up @@ -281,18 +250,14 @@ func (h *s3Handler) Verify(uri *url.URL, req *pb.RestoreRequest, currentGroups [

// ListManifests loads the manifests in the locations and returns them.
func (h *s3Handler) ListManifests(uri *url.URL) ([]string, error) {
mc, err := h.setup(uri)
if err != nil {
return nil, err
}
h.uri = uri

var manifests []string
doneCh := make(chan struct{})
defer close(doneCh)

suffix := "/" + backupManifest
for object := range mc.ListObjects(h.bucketName, h.objectPrefix, true, doneCh) {
for object := range h.mc.ListObjects(h.bucketName, h.objectPrefix, true, doneCh) {
if strings.HasSuffix(object.Key, suffix) {
manifests = append(manifests, object.Key)
}
Expand All @@ -305,12 +270,12 @@ func (h *s3Handler) ListManifests(uri *url.URL) ([]string, error) {
}

func (h *s3Handler) ReadManifest(path string, m *Manifest) error {
mc, err := h.setup(h.uri)
reader, err := h.mc.GetObject(h.bucketName, path, minio.GetObjectOptions{})
if err != nil {
return err
}

return h.readManifest(mc, path, m)
defer reader.Close()
return json.NewDecoder(reader).Decode(m)
}

// upload will block until it's done or an error occurs.
Expand Down