Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v20.03: fix(backup/restore): fixes backup and restore with DROP operations (G… #6933

Merged
merged 3 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 6 additions & 16 deletions dgraph/cmd/alpha/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,9 @@ func TestDeletePredicate(t *testing.T) {
output, err = runGraphqlQuery(`schema{}`)
require.NoError(t, err)

testutil.CompareJSON(t, `{"data":{"schema":[`+
`{"predicate":"age","type":"default"},`+
`{"predicate":"name","type":"string","index":true, "tokenizer":["term"]},`+
x.AclPredicates+","+x.GraphqlPredicates+","+
`{"predicate":"dgraph.type","type":"string","index":true, "tokenizer":["exact"],
"list":true}],`+x.InitialTypes+`}}`, output)
testutil.CompareJSON(t, testutil.GetFullSchemaHTTPResponse(testutil.SchemaOptions{UserPreds: `{"predicate":"age","type":"default"},` +
`{"predicate":"name","type":"string","index":true, "tokenizer":["term"]}`}),
output)

output, err = runGraphqlQuery(q1)
require.NoError(t, err)
Expand Down Expand Up @@ -1076,11 +1073,8 @@ func TestListTypeSchemaChange(t *testing.T) {
q = `schema{}`
res, err = runGraphqlQuery(q)
require.NoError(t, err)
testutil.CompareJSON(t, `{"data":{"schema":[`+
x.AclPredicates+","+x.GraphqlPredicates+","+
`{"predicate":"occupations","type":"string"},`+
`{"predicate":"dgraph.type", "type":"string", "index":true, "tokenizer": ["exact"],
"list":true}],`+x.InitialTypes+`}}`, res)
testutil.CompareJSON(t, testutil.GetFullSchemaHTTPResponse(testutil.
SchemaOptions{UserPreds: `{"predicate":"occupations","type":"string"}`}), res)
}

func TestDeleteAllSP2(t *testing.T) {
Expand Down Expand Up @@ -1323,11 +1317,7 @@ func TestDropAll(t *testing.T) {
q3 := "schema{}"
output, err = runGraphqlQuery(q3)
require.NoError(t, err)
testutil.CompareJSON(t,
`{"data":{"schema":[`+
x.AclPredicates+","+x.GraphqlPredicates+","+
`{"predicate":"dgraph.type", "type":"string", "index":true, "tokenizer":["exact"],
"list":true}],`+x.InitialTypes+`}}`, output)
testutil.CompareJSON(t, testutil.GetFullSchemaHTTPResponse(testutil.SchemaOptions{}), output)

// Reinstate schema so that we can re-run the original query.
err = alterSchemaWithRetry(s)
Expand Down
1 change: 1 addition & 0 deletions dgraph/cmd/bulk/systest/test-bulk-schema.sh
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ EOF
dgraph debug -p out/1/p 2>|/dev/null | grep '{s}' | cut -d' ' -f4 >> all_dbs.out
diff <(LC_ALL=C sort all_dbs.out | uniq -c) - <<EOF
1 dgraph.acl.rule
1 dgraph.drop.op
1 dgraph.graphql.schema
1 dgraph.graphql.xid
1 dgraph.password
Expand Down
40 changes: 40 additions & 0 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,28 @@ func PeriodicallyPostTelemetry() {
}
}

// insertDropRecord is used to insert a helper record when a DROP operation is performed.
// This helper record lets us know during backup that a DROP operation was performed and that we
// need to write this information in backup manifest. So that while restoring from a backup series,
// we can create an exact replica of the system which existed at the time the last backup was taken.
// Note that if the server crashes after the DROP operation & before this helper record is inserted,
// then restoring from the incremental backup of such a DB would restore even the dropped
// data back.
func insertDropRecord(ctx context.Context, dropOp string) error {
_, err := (&Server{}).doQuery(context.WithValue(ctx, IsGraphql, true),
&api.Request{
Mutations: []*api.Mutation{{
Set: []*api.NQuad{{
Subject: "_:r",
Predicate: "dgraph.drop.op",
ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: dropOp}},
}},
}},
CommitNow: true,
}, NoAuthorize)
return err
}

// Alter handles requests to change the schema or remove parts or all of the data.
func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, error) {
ctx, span := otrace.StartSpan(ctx, "Server.Alter")
Expand Down Expand Up @@ -177,6 +199,12 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er

m.DropOp = pb.Mutations_ALL
_, err := query.ApplyMutations(ctx, m)
if err != nil {
return empty, err
}

// insert a helper record for backup & restore, indicating that drop_all was done
err = insertDropRecord(ctx, "DROP_ALL;")

// recreate the admin account after a drop all operation
ResetAcl(nil)
Expand All @@ -190,6 +218,12 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er

m.DropOp = pb.Mutations_DATA
_, err := query.ApplyMutations(ctx, m)
if err != nil {
return empty, err
}

// insert a helper record for backup & restore, indicating that drop_data was done
err = insertDropRecord(ctx, "DROP_DATA;")

// recreate the admin account after a drop data operation
ResetAcl(nil)
Expand Down Expand Up @@ -228,6 +262,12 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
edges := []*pb.DirectedEdge{edge}
m.Edges = edges
_, err = query.ApplyMutations(ctx, m)
if err != nil {
return empty, err
}

// insert a helper record for backup & restore, indicating that drop_attr was done
err = insertDropRecord(ctx, "DROP_ATTR;"+attr)
return empty, err
}

Expand Down
16 changes: 16 additions & 0 deletions graphql/e2e/common/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ const (
// successfully connected
initSchema = `{
"schema": [
{
"predicate": "dgraph.drop.op",
"type": "string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down Expand Up @@ -88,6 +92,10 @@ const (
"predicate": "A.b",
"type": "string"
},
{
"predicate": "dgraph.drop.op",
"type": "string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down Expand Up @@ -158,6 +166,10 @@ const (
"predicate": "A.c",
"type": "int"
},
{
"predicate": "dgraph.drop.op",
"type": "string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down Expand Up @@ -239,6 +251,10 @@ const (
"predicate": "A.d",
"type": "float"
},
{
"predicate": "dgraph.drop.op",
"type": "string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down
3 changes: 3 additions & 0 deletions graphql/e2e/directives/dgraph_directives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func TestSchema_WithDgraphDirectives(t *testing.T) {
}, {
"predicate": "dgraph.employee.en.ename",
"type": "string"
}, {
"predicate": "dgraph.drop.op",
"type": "string"
}, {
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down
3 changes: 3 additions & 0 deletions graphql/e2e/normal/normal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func TestSchema_Normal(t *testing.T) {
}, {
"predicate": "User.password",
"type": "password"
}, {
"predicate": "dgraph.drop.op",
"type": "string"
}, {
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down
17 changes: 16 additions & 1 deletion protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ service Worker {
rpc StreamSnapshot (stream Snapshot) returns (stream KVS) {}
rpc Sort (SortMessage) returns (SortResult) {}
rpc Schema (SchemaRequest) returns (SchemaResult) {}
rpc Backup (BackupRequest) returns (Status) {}
rpc Backup (BackupRequest) returns (BackupResponse) {}
rpc Export (ExportRequest) returns (Status) {}
rpc ReceivePredicate(stream KVS) returns (api.Payload) {}
rpc MovePredicate(MovePredicatePayload) returns (api.Payload) {}
Expand Down Expand Up @@ -578,6 +578,21 @@ message BackupRequest {
repeated string predicates = 10;
}

message BackupResponse {
repeated DropOperation drop_operations = 1;
}

message DropOperation {
enum DropOp {
ALL = 0;
DATA = 1;
ATTR = 2;
}
DropOp drop_op = 1;
// When drop_op is ATTR, drop_value will be the name of the ATTR; empty otherwise.
string drop_value = 2;
}

message ExportRequest {
uint32 group_id = 1; // Group id to back up.
uint64 read_ts = 2;
Expand Down
Loading