Skip to content

Commit

Permalink
Bring back "perf(Backup): Improve backup performance (#7601)"
Browse files Browse the repository at this point in the history
This commit brings back the changes of PR #7601 which are still valid.
    - Add new ReadTs feild to Manifest and use it instead of sinceTs
  • Loading branch information
ahsanbarkati committed Apr 23, 2021
1 parent 6a08816 commit f89d3b4
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 32 deletions.
7 changes: 2 additions & 5 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,15 +405,12 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 {

// DecodeToBuffer is the same as Decode but it returns a z.Buffer which is
// calloc'ed and can be SHOULD be freed up by calling buffer.Release().
func DecodeToBuffer(pack *pb.UidPack, seek uint64) *z.Buffer {
buf, err := z.NewBufferWith(256<<20, 32<<30, z.UseCalloc, "Codec.DecodeToBuffer")
x.Check(err)
buf.AutoMmapAfter(1 << 30)
func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) *z.Buffer {

var last uint64
tmp := make([]byte, 16)
dec := Decoder{Pack: pack}
for uids := dec.Seek(seek, SeekStart); len(uids) > 0; uids = dec.Next() {
for uids := dec.Seek(0, SeekStart); len(uids) > 0; uids = dec.Next() {
for _, u := range uids {
n := binary.PutUvarint(tmp, u-last)
x.Check2(buf.Write(tmp[:n]))
Expand Down
10 changes: 8 additions & 2 deletions codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
humanize "github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -74,7 +75,9 @@ func TestBufferUidPack(t *testing.T) {
// Some edge case tests.
pack := Encode([]uint64{}, 128)
FreePack(pack)
buf := DecodeToBuffer(&pb.UidPack{}, 0)
buf := z.NewBuffer(10<<10, "TestBufferUidPack")
defer buf.Release()
DecodeToBuffer(buf, &pb.UidPack{})
require.Equal(t, 0, buf.LenNoPadding())
require.NoError(t, buf.Release())

Expand All @@ -90,7 +93,10 @@ func TestBufferUidPack(t *testing.T) {
actual := Decode(pack, 0)
require.Equal(t, expected, actual)

actualbuffer := DecodeToBuffer(pack, 0)
actualbuffer := z.NewBuffer(10<<10, "TestBufferUidPack")
defer actualbuffer.Release()

DecodeToBuffer(actualbuffer, pack)
enc := EncodeFromBuffer(actualbuffer.Bytes(), 256)
require.Equal(t, ExactLen(pack), ExactLen(enc))

Expand Down
6 changes: 4 additions & 2 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func runLsbackupCmd() error {
type backupEntry struct {
Path string `json:"path"`
Since uint64 `json:"since"`
ReadTs uint64 `json:"read_ts"`
BackupId string `json:"backup_id"`
BackupNum uint64 `json:"backup_num"`
Encrypted bool `json:"encrypted"`
Expand All @@ -108,7 +109,8 @@ func runLsbackupCmd() error {

be := backupEntry{
Path: manifest.Path,
Since: manifest.Since,
Since: manifest.SinceTsDeprecated,
ReadTs: manifest.ReadTs,
BackupId: manifest.BackupId,
BackupNum: manifest.BackupNum,
Encrypted: manifest.Encrypted,
Expand Down Expand Up @@ -271,7 +273,7 @@ func runExportBackup() error {
}
in := &pb.ExportRequest{
GroupId: uint32(gid),
ReadTs: latestManifest.Since,
ReadTs: latestManifest.ValidReadTs(),
UnixTs: time.Now().Unix(),
Format: opt.format,
Destination: exportDir,
Expand Down
4 changes: 3 additions & 1 deletion graphql/admin/list_backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type group struct {
type manifest struct {
Type string `json:"type,omitempty"`
Since uint64 `json:"since,omitempty"`
ReadTs uint64 `json:"read_ts,omitempty"`
Groups []*group `json:"groups,omitempty"`
BackupId string `json:"backupId,omitempty"`
BackupNum uint64 `json:"backupNum,omitempty"`
Expand Down Expand Up @@ -107,7 +108,8 @@ func convertManifests(manifests []*worker.Manifest) []*manifest {
for i, m := range manifests {
res[i] = &manifest{
Type: m.Type,
Since: m.Since,
Since: m.SinceTsDeprecated,
ReadTs: m.ReadTs,
BackupId: m.BackupId,
BackupNum: m.BackupNum,
Path: m.Path,
Expand Down
14 changes: 5 additions & 9 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,15 +870,11 @@ func (l *List) ToBackupPostingList(
defer out.free()

ol := out.plist
// Encode uids to []byte instead of []uint64 if we have more than 1000
// uids. We do this to improve the memory usage.
if codec.ApproxLen(ol.Pack) > 1024 {
buf := codec.DecodeToBuffer(ol.Pack, 0)
defer buf.Release()
bl.UidBytes = buf.Bytes()
} else {
bl.Uids = codec.Decode(ol.Pack, 0)
}

// Encode uids to []byte instead of []uint64. This helps improve memory usage.
buf.Reset()
codec.DecodeToBuffer(buf, ol.Pack)
bl.UidBytes = buf.Bytes()
bl.Postings = ol.Postings
bl.CommitTs = ol.CommitTs
bl.Splits = ol.Splits
Expand Down
16 changes: 12 additions & 4 deletions worker/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ type Manifest struct {
sync.Mutex
//Type is the type of backup, either full or incremental.
Type string `json:"type"`
// Since is the timestamp at which this backup was taken. It's called Since
// because it will become the timestamp from which to backup in the next
// incremental backup.
Since uint64 `json:"since"`
// SinceTsDeprecated is kept for backward compatibility. Use readTs instead of sinceTs.
SinceTsDeprecated uint64 `json:"since"`
// ReadTs is the timestamp at which this backup was taken. This would be
// the since timestamp for the next incremental backup.
ReadTs uint64 `json:"read_ts"`
// Groups is the map of valid groups to predicates at the time the backup was created.
Groups map[uint32][]string `json:"groups"`
// BackupId is a unique ID assigned to all the backups in the same series
Expand All @@ -75,6 +76,13 @@ type Manifest struct {
Compression string `json:"compression"`
}

func (m *Manifest) ValidReadTs() uint64 {
if m.ReadTs == 0 {
return m.SinceTsDeprecated
}
return m.ReadTs
}

type MasterManifest struct {
Manifests []*Manifest
}
Expand Down
11 changes: 7 additions & 4 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,11 @@ func doBackup(ctx context.Context, req *pb.BackupRequest, forceFull bool) error
return err
}

req.SinceTs = latestManifest.Since
// Use the readTs as the sinceTs for the next backup. If not found, use the
// SinceTsDeprecated value from the latest manifest.
req.SinceTs = latestManifest.ValidReadTs()
if forceFull {
// To force a full backup we'll set the sinceTs to zero.
req.SinceTs = 0
} else {
if x.WorkerConfig.EncryptionKey != nil {
Expand Down Expand Up @@ -241,7 +244,7 @@ func doBackup(ctx context.Context, req *pb.BackupRequest, forceFull bool) error

dir := fmt.Sprintf(backupPathFmt, req.UnixTs)
m := Manifest{
Since: req.ReadTs,
ReadTs: req.ReadTs,
Groups: predMap,
Version: x.DgraphVersion,
DropOperations: dropOperations,
Expand Down Expand Up @@ -584,8 +587,8 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro

// GoString implements the GoStringer interface for Manifest.
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Since: %d, Groups: %v, Encrypted: %v}`,
m.Since, m.Groups, m.Encrypted)
return fmt.Sprintf(`Manifest{Since: %d, ReadTs: %d, Groups: %v, Encrypted: %v}`,
m.SinceTsDeprecated, m.ReadTs, m.Groups, m.Encrypted)
}

func (tl *threadLocal) toBackupList(key []byte, itr *badger.Iterator) (
Expand Down
2 changes: 1 addition & 1 deletion worker/backup_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func getFilteredManifests(h UriHandler, manifests []*Manifest,
for _, m := range manifests {
missingFiles := false
for g, _ := range m.Groups {
path := filepath.Join(m.Path, backupName(m.Since, g))
path := filepath.Join(m.Path, backupName(m.ValidReadTs(), g))
if !h.FileExists(path) {
missingFiles = true
break
Expand Down
2 changes: 1 addition & 1 deletion worker/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,5 +471,5 @@ func RunOfflineRestore(dir, location, backupId string, keyFile string,
}
}
// TODO: Fix this return value.
return LoadResult{Version: manifest.Since}
return LoadResult{Version: manifest.ValidReadTs()}
}
8 changes: 5 additions & 3 deletions worker/restore_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type loadBackupInput struct {
dropOperations []*pb.DropOperation
isOld bool
keepSchema bool
compression string
}

type listReq struct {
Expand Down Expand Up @@ -577,7 +578,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) error {
if dropAll {
break
}
if manifest.Since == 0 || len(manifest.Groups) == 0 {
if manifest.ValidReadTs() == 0 || len(manifest.Groups) == 0 {
continue
}
for gid := range manifest.Groups {
Expand All @@ -589,7 +590,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) error {

// Only restore the predicates that were assigned to this group at the time
// of the last backup.
file := filepath.Join(manifest.Path, backupName(manifest.Since, gid))
file := filepath.Join(manifest.Path, backupName(manifest.ValidReadTs(), gid))
br := readerFrom(h, file).WithEncryption(encKey).WithCompression(manifest.Compression)
if br.err != nil {
return errors.Wrap(br.err, "newBackupReader")
Expand All @@ -609,7 +610,8 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) error {
isOld: manifest.Version == 0,
restoreTs: req.RestoreTs,
// Only map the schema keys corresponding to the latest backup.
keepSchema: i == 0,
keepSchema: i == 0,
compression: manifest.Compression,
}

// This would stream the backups from the source, and map them in
Expand Down

0 comments on commit f89d3b4

Please sign in to comment.