Skip to content

Commit

Permalink
fix(backup): use StreamWriter to write to badger during backup/restore
Browse files Browse the repository at this point in the history
This commit is a major rewrite of backup and online restore
code. It used to use KVLoader in badger. Now it instead uses
StreamWriter that is much faster for writes.

cherry-pick PR #7753

following commits are cherry-picked (in reverse order):
 * fix(backup): Fix full backup request (#7932) (#7933)
 * fix: fixing graphql schema update when the data is restored +
 * fix(restore): return nil if there is error (#7899)
        skipping /probe/graphql from audit (#7925)
 * Don't ban namespace in export_backup
 * reset the kv.StreamId before sending to stream writer (#7833) (#7837)
 * fix(restore): Bump uid and namespace after restore (#7790) (#7800)
 * fix(ee): GetKeys should return an error (#7713) (#7797)
 * fix(backup): Free the UidPack after use (#7786)
 * fix(export-backup): Fix double free in export backup (#7780) (#7783)
 * fix(lsbackup): Fix profiler in lsBackup (#7729)
 * Bring back "perf(Backup): Improve backup performance (#7601)"
 * Opt(Backup): Make backups faster (#7680)
 * Fix s3 backup copy (#7669)
 * [BREAKING] Opt(Restore): Optimize Restore's new map-reduce based design (#7666)
 * Perf(restore): Implement map-reduce based restore (#7664)
 * feat(backup): Merge backup refactoring
 * Revert "perf(Backup): Improve backup performance (#7601)"
  • Loading branch information
ahsanbarkati authored and mangalaman93 committed Jan 4, 2023
1 parent 4c1b575 commit 0c75dae
Show file tree
Hide file tree
Showing 56 changed files with 3,767 additions and 3,349 deletions.
4 changes: 3 additions & 1 deletion codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 {

// DecodeToBuffer is the same as Decode but it returns a z.Buffer which is
// calloc'ed and can be SHOULD be freed up by calling buffer.Release().
func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) {
func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) *z.Buffer {

var last uint64
tmp := make([]byte, 16)
dec := Decoder{Pack: pack}
Expand All @@ -416,6 +417,7 @@ func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) {
last = u
}
}
return buf
}

func match32MSB(num1, num2 uint64) bool {
Expand Down
1 change: 0 additions & 1 deletion codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func TestBufferUidPack(t *testing.T) {
// Some edge case tests.
pack := Encode([]uint64{}, 128)
FreePack(pack)

buf := z.NewBuffer(10<<10, "TestBufferUidPack")
defer buf.Release()
DecodeToBuffer(buf, &pb.UidPack{})
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
partitionKeys = append(partitionKeys, nil)

for i := 0; i < len(partitionKeys); i++ {
pkey := partitionKeys[i]
for _, itr := range mapItrs {
pkey := partitionKeys[i]
itr.Next(cbuf, pkey)
}
if cbuf.LenNoPadding() < 256<<20 {
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/decrypt/decrypt.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

type options struct {
// keyfile comes from the encryption or Vault flags
// keyfile comes from the encryption_key_file or Vault flags
keyfile x.Sensitive
file string
output string
Expand Down
44 changes: 27 additions & 17 deletions dgraph/cmd/increment/increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,40 +199,50 @@ func run(conf *viper.Viper) {
dg = dgTmp
}

// Run things serially first.
for i := 0; i < conc; i++ {
_, err := process(dg, conf)
x.Check(err)
num--
addOne := func(i int) error {
txnStart := time.Now() // Start time of transaction
cnt, err := process(dg, conf)
now := time.Now().UTC().Format(format)
if err != nil {
return err
}
serverLat := cnt.qLatency + cnt.mLatency
clientLat := time.Since(txnStart).Round(time.Millisecond)
fmt.Printf(
"[w%d] %-17s Counter VAL: %d [ Ts: %d ] Latency: Q %s M %s S %s C %s D %s\n",
i, now, cnt.Val, cnt.startTs, cnt.qLatency, cnt.mLatency,
serverLat, clientLat, clientLat-serverLat)
return nil
}

// Run things serially first, if conc > 1.
if conc > 1 {
for i := 0; i < conc; i++ {
err := addOne(0)
x.Check(err)
num--
}
}

var wg sync.WaitGroup
f := func(i int) {
f := func(worker int) {
defer wg.Done()
count := 0
for count < num {
txnStart := time.Now() // Start time of transaction
cnt, err := process(dg, conf)
now := time.Now().UTC().Format(format)
if err != nil {
if err := addOne(worker); err != nil {
now := time.Now().UTC().Format(format)
fmt.Printf("%-17s While trying to process counter: %v. Retrying...\n", now, err)
time.Sleep(time.Second)
continue
}
serverLat := cnt.qLatency + cnt.mLatency
clientLat := time.Since(txnStart).Round(time.Millisecond)
fmt.Printf(
"[%d] %-17s Counter VAL: %d [ Ts: %d ] Latency: Q %s M %s S %s C %s D %s\n",
i, now, cnt.Val, cnt.startTs, cnt.qLatency, cnt.mLatency,
serverLat, clientLat, clientLat-serverLat)
time.Sleep(waitDur)
count++
}
}

for i := 0; i < conc; i++ {
wg.Add(1)
go f(i)
go f(i + 1)
}
wg.Wait()
}
1 change: 1 addition & 0 deletions dgraph/cmd/root_ee.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !oss
// +build !oss

/*
Expand Down
23 changes: 22 additions & 1 deletion dgraph/cmd/zero/assign.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func (s *Server) lease(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error
}

// AssignIds is used to assign new ids (UIDs, NsIDs) by communicating with the leader of the
// RAFT group responsible for handing out ids.
// RAFT group responsible for handing out ids. If bump is set to true in the request then the
// lease for the given id type is bumped to num.Val and {startId, endId} of the newly leased ids
// in the process of bump is returned.
func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
if ctx.Err() != nil {
return &emptyAssignedIds, ctx.Err()
Expand Down Expand Up @@ -246,6 +248,25 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
return err
}

// If this is a bump request and the current node is the leader then we create a normal lease
// request based on the number of required ids to reach the asked bump value. If the current
// node is not the leader then the bump request will be forwarded to the leader by lease().
if num.GetBump() && s.Node.AmLeader() {
s.leaseLock.Lock()
cur := s.nextLease[num.GetType()] - 1
s.leaseLock.Unlock()

// We need to lease more UIDs if bump request is more than current max lease.
req := num.GetVal()
if cur >= req {
return &emptyAssignedIds, errors.Errorf("Nothing to be leased")
}
num.Val = req - cur

// Set bump to false because we want to lease the required ids in the following request.
num.Bump = false
}

c := make(chan error, 1)
go func() {
c <- lease()
Expand Down
39 changes: 39 additions & 0 deletions dgraph/cmd/zero/zero_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/testutil"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

func TestRemoveNode(t *testing.T) {
Expand All @@ -44,3 +45,41 @@ func TestIdLeaseOverflow(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "limit has reached")
}

func TestIdBump(t *testing.T) {
dialOpts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
}
ctx := context.Background()
con, err := grpc.DialContext(ctx, testutil.SockAddrZero, dialOpts...)
require.NoError(t, err)

zc := pb.NewZeroClient(con)

res, err := zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID})
require.NoError(t, err)
require.Equal(t, uint64(10), res.GetEndId()-res.GetStartId()+1)

// Next assignemnt's startId should be greater than 10.
res, err = zc.AssignIds(ctx, &pb.Num{Val: 50, Type: pb.Num_UID})
require.NoError(t, err)
require.Greater(t, res.GetStartId(), uint64(10))
require.Equal(t, uint64(50), res.GetEndId()-res.GetStartId()+1)

bumpTo := res.GetEndId() + 100000

// Bump the lease to (last result + 100000).
res, err = zc.AssignIds(ctx, &pb.Num{Val: bumpTo, Type: pb.Num_UID, Bump: true})
require.NoError(t, err)

// Next assignemnt's startId should be greater than bumpTo.
res, err = zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID})
require.NoError(t, err)
require.Greater(t, res.GetStartId(), bumpTo)
require.Equal(t, uint64(10), res.GetEndId()-res.GetStartId()+1)

// If bump request is less than maxLease, then it should result in no-op.
res, err = zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID, Bump: true})
require.Contains(t, err.Error(), "Nothing to be leased")
}
2 changes: 1 addition & 1 deletion dgraph/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
for range ticker.C {
// Read Jemalloc stats first. Print if there's a big difference.
z.ReadMemStats(&js)
if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 256<<20 {
if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 {
glog.V(2).Infof("NumAllocBytes: %s jemalloc: Active %s Allocated: %s"+
" Resident: %s Retained: %s\n",
humanize.IBytes(uint64(z.NumAllocBytes())),
Expand Down
5 changes: 3 additions & 2 deletions ee/acl/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/dgraph-io/dgo/v210/protos/api"
"github.com/dgraph-io/dgraph/testutil"
"github.com/dgraph-io/dgraph/x"

"github.com/golang/glog"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -2858,7 +2859,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
queryName: "listBackups",
respIsArray: true,
testGuardianAccess: true,
guardianErr: "The path \"\" does not exist or it is inaccessible.",
guardianErr: `The uri path: "" doesn't exist`,
guardianData: `{"listBackups": []}`,
},
{
Expand Down Expand Up @@ -2939,7 +2940,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
}`,
queryName: "restore",
testGuardianAccess: true,
guardianErr: "The path \"\" does not exist or it is inaccessible.",
guardianErr: `The uri path: "" doesn't exist`,
guardianData: `{"restore": {"code": "Failure"}}`,
},
{
Expand Down
5 changes: 3 additions & 2 deletions ee/audit/interceptor_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ var skipApis = map[string]bool{

var skipEPs = map[string]bool{
// list of endpoints that needs to be skipped
"/health": true,
"/state": true,
"/health": true,
"/state": true,
"/probe/graphql": true,
}

func AuditRequestGRPC(ctx context.Context, req interface{},
Expand Down
Loading

0 comments on commit 0c75dae

Please sign in to comment.