Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(raftwal): take snapshot after restore #7719

Merged
merged 4 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions systest/online-restore/online_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import (

"github.com/dgraph-io/dgo/v210"
"github.com/dgraph-io/dgo/v210/protos/api"
"github.com/graph-gophers/graphql-go/errors"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/testutil"
"github.com/dgraph-io/dgraph/x"
)

func sendRestoreRequest(t *testing.T, location, backupId string, backupNum int) {
Expand Down Expand Up @@ -96,6 +98,34 @@ func disableDraining(t *testing.T) {
require.Contains(t, string(buf), "draining mode has been set to false")
}

func getSnapshotTs(t *testing.T) uint64 {
snapTsRequest := `query {
state {
groups{
id
snapshotTs
}
}
}`

params := testutil.GraphQLParams{
Query: snapTsRequest,
}
resp := testutil.MakeGQLRequestWithTLS(t, &params, testutil.GetAlphaClientConfig(t))
resp.RequireNoGraphQLErrors(t)

var stateResp struct {
State struct {
Groups []struct {
SnapshotTs uint64
}
}
}
require.NoError(t, json.Unmarshal(resp.Data, &stateResp))
require.GreaterOrEqual(t, len(stateResp.State.Groups), 1)
return stateResp.State.Groups[0].SnapshotTs
}

func runQueries(t *testing.T, dg *dgo.Dgraph, shouldFail bool) {
_, thisFile, _, _ := runtime.Caller(0)
queryDir := filepath.Join(filepath.Dir(thisFile), "queries")
Expand Down Expand Up @@ -177,8 +207,16 @@ func TestBasicRestore(t *testing.T) {
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

snapshotTs := getSnapshotTs(t)
sendRestoreRequest(t, "", "youthful_rhodes3", 0)
testutil.WaitForRestore(t, dg)
// Snapshot must be taken just after the restore and hence the snapshotTs be updated.
require.NoError(t, x.RetryUntilSuccess(3, 1*time.Second, func() error {
if getSnapshotTs(t) <= snapshotTs {
return errors.Errorf("snapshot not taken after restore")
}
return nil
}))
runQueries(t, dg, false)
runMutations(t, dg)
}
Expand Down
4 changes: 3 additions & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,9 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error {
}
defer closer.Done()

if err := handleRestoreProposal(ctx, proposal.Restore); err != nil {
glog.Infof("Got restore proposal at Index:%d, ReadTs:%d",
proposal.Index, proposal.Restore.RestoreTs)
if err := handleRestoreProposal(ctx, proposal.Restore, proposal.Index); err != nil {
return err
}

Expand Down
18 changes: 14 additions & 4 deletions worker/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (w *grpcWorker) Restore(ctx context.Context, req *pb.RestoreRequest) (*pb.S
}

// TODO(DGRAPH-1232): Ensure all groups receive the restore proposal.
func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error {
func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uint64) error {
if req == nil {
return errors.Errorf("nil restore request")
}
Expand Down Expand Up @@ -364,9 +364,19 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error {

// Propose a snapshot immediately after all the work is done to prevent the restore
// from being replayed.
if err := groups().Node.proposeSnapshot(); err != nil {
return errors.Wrapf(err, "cannot propose snapshot after processing restore proposal")
}
go func(idx uint64) {
n := groups().Node
if !n.AmLeader() {
return
}
if err := n.Applied.WaitForMark(context.Background(), idx); err != nil {
glog.Errorf("Error waiting for mark for index %d: %+v", idx, err)
return
}
if err := n.proposeSnapshot(); err != nil {
glog.Errorf("cannot propose snapshot after processing restore proposal %+v", err)
}
}(pidx)

// Update the membership state to re-compute the group checksums.
if err := UpdateMembershipState(ctx); err != nil {
Expand Down