Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(restore): fix restoring of type keys from old version (20.11) #7456

Merged
merged 1 commit into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions worker/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ func NewUriHandler(uri *url.URL, creds *x.MinioCredentials) (UriHandler, error)
// loadFn is a function that will receive the current file being read.
// A reader, the backup groupId, and a map whose keys are the predicates to restore
// are passed as arguments.
type loadFn func(reader io.Reader, groupId uint32, preds predicateSet,
dropOperations []*pb.DropOperation) (uint64, uint64, error)
type loadFn func(groupId uint32, in *loadBackupInput) (uint64, uint64, error)

// LoadBackup will scan location l for backup files in the given backup series and load them
// sequentially. Returns the maximum Since value on success, otherwise an error.
Expand Down
13 changes: 9 additions & 4 deletions worker/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ func (h *fileHandler) Load(uri *url.URL, backupId string, backupNum uint64, fn l
// of the last backup.
predSet := manifests[len(manifests)-1].getPredsInGroup(gid)

groupMaxUid, groupMaxNsId, err := fn(fp, gid, predSet, manifest.DropOperations)
groupMaxUid, groupMaxNsId, err := fn(gid,
&loadBackupInput{r: fp, preds: predSet, dropOperations: manifest.DropOperations,
isOld: manifest.Version == 0})
if err != nil {
return LoadResult{Err: err}
}
Expand Down Expand Up @@ -285,7 +287,7 @@ func (h *fileHandler) ExportBackup(backupDir, exportDir, format string,
}

// Function to load the a single backup file.
loadFn := func(r io.Reader, groupId uint32, preds predicateSet) (uint64, error) {
loadFn := func(r io.Reader, groupId uint32, preds predicateSet, isOld bool) (uint64, error) {
dir := filepath.Join(tmpDir, fmt.Sprintf("p%d", groupId))

r, err := enc.GetReader(key, r)
Expand Down Expand Up @@ -313,7 +315,10 @@ func (h *fileHandler) ExportBackup(backupDir, exportDir, format string,
return 0, errors.Wrapf(err, "cannot open DB at %s", dir)
}
defer db.Close()
_, _, err = loadFromBackup(db, gzReader, 0, preds, nil)
_, _, err = loadFromBackup(db, &loadBackupInput{
// TODO(Naman): Why is drop operations nil here?
r: gzReader, restoreTs: 0, preds: preds, dropOperations: nil, isOld: isOld,
})
if err != nil {
return 0, errors.Wrapf(err, "cannot load backup")
}
Expand Down Expand Up @@ -344,7 +349,7 @@ func (h *fileHandler) ExportBackup(backupDir, exportDir, format string,
// of the last backup.
predSet := manifest.getPredsInGroup(gid)

_, err = loadFn(fp, gid, predSet)
_, err = loadFn(fp, gid, predSet, manifest.Version == 0)
if err != nil {
return err
}
Expand Down
13 changes: 6 additions & 7 deletions worker/online_restore_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package worker
import (
"compress/gzip"
"context"
"io"
"net/url"
"strings"
"sync"
Expand Down Expand Up @@ -324,8 +323,7 @@ func getCredentialsFromRestoreRequest(req *pb.RestoreRequest) *x.MinioCredential
func writeBackup(ctx context.Context, req *pb.RestoreRequest) error {
res := LoadBackup(req.Location, req.BackupId, req.BackupNum,
getCredentialsFromRestoreRequest(req),
func(r io.Reader, groupId uint32, preds predicateSet,
dropOperations []*pb.DropOperation) (uint64, uint64, error) {
func(groupId uint32, in *loadBackupInput) (uint64, uint64, error) {
if groupId != req.GroupId {
// LoadBackup will try to call the backup function for every group.
// Exit here if the group is not the one indicated by the request.
Expand All @@ -340,17 +338,18 @@ func writeBackup(ctx context.Context, req *pb.RestoreRequest) error {
if err != nil {
return 0, 0, errors.Wrapf(err, "unable to read key")
}
r, err = enc.GetReader(key, r)
in.r, err = enc.GetReader(key, in.r)
if err != nil {
return 0, 0, errors.Wrapf(err, "cannot get encrypted reader")
}
gzReader, err := gzip.NewReader(r)
gzReader, err := gzip.NewReader(in.r)
if err != nil {
return 0, 0, errors.Wrapf(err, "couldn't create gzip reader")
}

maxUid, maxNsId, err := loadFromBackup(pstore, gzReader, req.RestoreTs, preds,
dropOperations)
maxUid, maxNsId, err := loadFromBackup(pstore, &loadBackupInput{
r: gzReader, restoreTs: req.RestoreTs, preds: in.preds, dropOperations: in.dropOperations,
})
if err != nil {
return 0, 0, errors.Wrapf(err, "cannot write backup")
}
Expand Down
53 changes: 41 additions & 12 deletions worker/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/options"
bpb "github.com/dgraph-io/badger/v3/pb"
"github.com/golang/glog"
"github.com/pkg/errors"

"github.com/dgraph-io/dgraph/codec"
Expand All @@ -37,7 +38,8 @@ import (
)

// RunRestore calls badger.Load and tries to load data into a new DB.
func RunRestore(pdir, location, backupId string, key x.SensitiveByteSlice, ctype options.CompressionType, clevel int) LoadResult {
func RunRestore(pdir, location, backupId string, key x.SensitiveByteSlice,
ctype options.CompressionType, clevel int) LoadResult {
// Create the pdir if it doesn't exist.
if err := os.MkdirAll(pdir, 0700); err != nil {
return LoadResult{Err: err}
Expand All @@ -46,11 +48,10 @@ func RunRestore(pdir, location, backupId string, key x.SensitiveByteSlice, ctype
// Scan location for backup files and load them. Each file represents a node group,
// and we create a new p dir for each.
return LoadBackup(location, backupId, 0, nil,
func(r io.Reader, groupId uint32, preds predicateSet,
dropOperations []*pb.DropOperation) (uint64, uint64, error) {
func(groupId uint32, in *loadBackupInput) (uint64, uint64, error) {

dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId))
r, err := enc.GetReader(key, r)
r, err := enc.GetReader(key, in.r)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -81,28 +82,37 @@ func RunRestore(pdir, location, backupId string, key x.SensitiveByteSlice, ctype
if !pathExist(dir) {
fmt.Println("Creating new db:", dir)
}
maxUid, maxNsId, err := loadFromBackup(db, gzReader, 0, preds, dropOperations)
maxUid, maxNsId, err := loadFromBackup(db, &loadBackupInput{
r: gzReader, restoreTs: 0, preds: in.preds, dropOperations: in.dropOperations,
})
if err != nil {
return 0, 0, err
}
return maxUid, maxNsId, x.WriteGroupIdFile(dir, uint32(groupId))
})
}

type loadBackupInput struct {
r io.Reader
restoreTs uint64
preds predicateSet
dropOperations []*pb.DropOperation
isOld bool
}

// loadFromBackup reads the backup, converts the keys and values to the required format,
// and loads them to the given badger DB. The set of predicates is used to avoid restoring
// values from predicates no longer assigned to this group.
// If restoreTs is greater than zero, the key-value pairs will be written with that timestamp.
// Otherwise, the original value is used.
// TODO(DGRAPH-1234): Check whether restoreTs can be removed.
func loadFromBackup(db *badger.DB, r io.Reader, restoreTs uint64, preds predicateSet,
dropOperations []*pb.DropOperation) (uint64, uint64, error) {
br := bufio.NewReaderSize(r, 16<<10)
func loadFromBackup(db *badger.DB, in *loadBackupInput) (uint64, uint64, error) {
br := bufio.NewReaderSize(in.r, 16<<10)
unmarshalBuf := make([]byte, 1<<10)

// if there were any DROP operations that need to be applied before loading the backup into
// the db, then apply them here
if err := applyDropOperationsBeforeRestore(db, dropOperations); err != nil {
if err := applyDropOperationsBeforeRestore(db, in.dropOperations); err != nil {
return 0, 0, errors.Wrapf(err, "cannot apply DROP operations while loading backup")
}

Expand Down Expand Up @@ -156,7 +166,7 @@ func loadFromBackup(db *badger.DB, r io.Reader, restoreTs uint64, preds predicat
if err != nil {
return 0, 0, errors.Wrapf(err, "could not parse key %s", hex.Dump(restoreKey))
}
if _, ok := preds[parsedKey.Attr]; !parsedKey.IsType() && !ok {
if _, ok := in.preds[parsedKey.Attr]; !parsedKey.IsType() && !ok {
continue
}

Expand All @@ -170,8 +180,8 @@ func loadFromBackup(db *badger.DB, r io.Reader, restoreTs uint64, preds predicat

// Override the version if requested. Should not be done for type and schema predicates,
// which always have their version set to 1.
if restoreTs > 0 && !parsedKey.IsSchema() && !parsedKey.IsType() {
kv.Version = restoreTs
if in.restoreTs > 0 && !parsedKey.IsSchema() && !parsedKey.IsType() {
kv.Version = in.restoreTs
}

switch kv.GetUserMeta()[0] {
Expand Down Expand Up @@ -218,6 +228,25 @@ func loadFromBackup(db *badger.DB, r io.Reader, restoreTs uint64, preds predicat
}

case posting.BitSchemaPosting:
appendNamespace := func() error {
// If the backup was taken on old version, we need to append the namespace to
// the fields of TypeUpdate.
var update pb.TypeUpdate
if err := update.Unmarshal(kv.Value); err != nil {
return err
}
for _, sch := range update.Fields {
sch.Predicate = x.GalaxyAttr(sch.Predicate)
}
kv.Value, err = update.Marshal()
return err
}
if in.isOld && parsedKey.IsType() {
if err := appendNamespace(); err != nil {
glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err)
continue
}
}
// Schema and type keys are not stored in an intermediate format so their
// value can be written as is.
kv.Key = restoreKey
Expand Down
4 changes: 3 additions & 1 deletion worker/s3_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, backupNum uint64, fn loa
// of the last backup.
predSet := manifests[len(manifests)-1].getPredsInGroup(gid)

groupMaxUid, groupMaxNsId, err := fn(reader, gid, predSet, manifest.DropOperations)
groupMaxUid, groupMaxNsId, err := fn(gid,
&loadBackupInput{r: reader, preds: predSet, dropOperations: manifest.DropOperations,
isOld: manifest.Version == 0})
if err != nil {
return LoadResult{Err: err}
}
Expand Down