Skip to content

Commit

Permalink
2103: feat(schema): do schema versioning and make backup non-blocking…
Browse files Browse the repository at this point in the history
… for i… (#7856)

* feat(schema): do schema versioning and make backup non-blocking for indexing (#7852)

Issue:
We used to write schema/types at timestamp=1. That single bit violates the Snapshot Isolation. Hence, we couldn't run schema updates and other operations like backup concurrently. This is an issue in a multi-tenant cluster because if the backup is running we couldn't update the schema.
Fix:
This PR writes the schema/types at the startTs of the transaction and enables the schema updates to be made even if the backup is running.

(cherry picked from commit f376a40)

* fix the predicate move

(cherry picked from commit eb0a04b)
  • Loading branch information
NamanJain8 committed Jun 1, 2021
1 parent 0e417a2 commit 4165f3d
Show file tree
Hide file tree
Showing 16 changed files with 514 additions and 412 deletions.
5 changes: 2 additions & 3 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,6 @@ func (r *rebuilder) Run(ctx context.Context) error {
WithNumVersionsToKeep(math.MaxInt32).
WithLogger(&x.ToGlog{}).
WithCompression(options.None).
WithEncryptionKey(x.WorkerConfig.EncryptionKey).
WithLoggingLevel(badger.WARNING).
WithMetricsEnabled(false)

Expand Down Expand Up @@ -1242,14 +1241,14 @@ func DeleteData() error {
}

// DeletePredicate deletes all entries and indices for a given predicate.
func DeletePredicate(ctx context.Context, attr string) error {
func DeletePredicate(ctx context.Context, attr string, ts uint64) error {
glog.Infof("Dropping predicate: [%s]", attr)
prefix := x.PredicatePrefix(attr)
if err := pstore.DropPrefix(prefix); err != nil {
return err
}

return schema.State().Delete(attr)
return schema.State().Delete(attr, ts)
}

// DeleteNamespace bans the namespace and deletes its predicates/types from the schema.
Expand Down
2 changes: 2 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ message Proposal {
RestoreRequest restore = 12;
CDCState cdc_state = 13;
DeleteNsRequest delete_ns = 14; // Used to delete namespace.
// Skipping 15 as it is used for uint64 key in master and might be needed later here.
uint64 start_ts = 16;
}

message CDCState {
Expand Down
710 changes: 374 additions & 336 deletions protos/pb/pb.pb.go

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"sync"

"github.com/golang/glog"
Expand Down Expand Up @@ -96,17 +97,17 @@ func (s *state) DeleteAll() {
}

// Delete updates the schema in memory and disk
func (s *state) Delete(attr string) error {
func (s *state) Delete(attr string, ts uint64) error {
s.Lock()
defer s.Unlock()

glog.Infof("Deleting schema for predicate: [%s]", attr)
txn := pstore.NewTransactionAt(1, true)
txn := pstore.NewTransactionAt(ts, true)
if err := txn.Delete(x.SchemaKey(attr)); err != nil {
return err
}
// Delete is called rarely so sync write should be fine.
if err := txn.CommitAt(1, nil); err != nil {
if err := txn.CommitAt(ts, nil); err != nil {
return err
}

Expand All @@ -116,7 +117,7 @@ func (s *state) Delete(attr string) error {
}

// DeleteType updates the schema in memory and disk
func (s *state) DeleteType(typeName string) error {
func (s *state) DeleteType(typeName string, ts uint64) error {
if s == nil {
return nil
}
Expand All @@ -125,12 +126,12 @@ func (s *state) DeleteType(typeName string) error {
defer s.Unlock()

glog.Infof("Deleting type definition for type: [%s]", typeName)
txn := pstore.NewTransactionAt(1, true)
txn := pstore.NewTransactionAt(ts, true)
if err := txn.Delete(x.TypeKey(typeName)); err != nil {
return err
}
// Delete is called rarely so sync write should be fine.
if err := txn.CommitAt(1, nil); err != nil {
if err := txn.CommitAt(ts, nil); err != nil {
return err
}

Expand Down Expand Up @@ -466,14 +467,14 @@ func Init(ps *badger.DB) {
reset()
}

// Load reads the schema for the given predicate from the DB.
// Load reads the latest schema for the given predicate from the DB.
func Load(predicate string) error {
if len(predicate) == 0 {
return errors.Errorf("Empty predicate")
}
delete(State().mutSchema, predicate)
key := x.SchemaKey(predicate)
txn := pstore.NewTransactionAt(1, false)
txn := pstore.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound || err == badger.ErrBannedKey {
Expand Down Expand Up @@ -507,7 +508,7 @@ func LoadFromDb() error {
// LoadSchemaFromDb iterates through the DB and loads all the stored schema updates.
func LoadSchemaFromDb() error {
prefix := x.SchemaPrefix()
txn := pstore.NewTransactionAt(1, false)
txn := pstore.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions) // Need values, reversed=false.
defer itr.Close()
Expand Down Expand Up @@ -543,7 +544,7 @@ func LoadSchemaFromDb() error {
// LoadTypesFromDb iterates through the DB and loads all the stored type updates.
func LoadTypesFromDb() error {
prefix := x.TypePrefix()
txn := pstore.NewTransactionAt(1, false)
txn := pstore.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions) // Need values, reversed=false.
defer itr.Close()
Expand Down
2 changes: 1 addition & 1 deletion systest/backup/encryption/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ func TestBackupMinioE(t *testing.T) {
t.Log("Pausing to let zero move tablet...")
moveOk := false
for retry := 5; retry > 0; retry-- {
time.Sleep(3 * time.Second)
state, err := testutil.GetStateHttps(testutil.GetAlphaClientConfig(t))
require.NoError(t, err)
if _, ok := state.Groups["1"].Tablets[tabletName]; ok {
moveOk = true
break
}
time.Sleep(1 * time.Second)
}
require.True(t, moveOk)

Expand Down
2 changes: 1 addition & 1 deletion systest/backup/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ func TestBackupFilesystem(t *testing.T) {
t.Log("Pausing to let zero move tablet...")
moveOk := false
for retry := 5; retry > 0; retry-- {
time.Sleep(3 * time.Second)
state, err := testutil.GetStateHttps(testutil.GetAlphaClientConfig(t))
require.NoError(t, err)
if _, ok := state.Groups["1"].Tablets[x.NamespaceAttr(x.GalaxyNamespace, "movie")]; ok {
moveOk = true
break
}
time.Sleep(1 * time.Second)
}

require.True(t, moveOk)
Expand Down
2 changes: 1 addition & 1 deletion systest/backup/minio-large/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func setupTablets(t *testing.T, dg *dgo.Dgraph) {
t.Log("Pausing to let zero move tablets...")
moveOk := false
for retry := 5; retry > 0; retry-- {
time.Sleep(3 * time.Second)
state, err := testutil.GetStateHttps(testutil.GetAlphaClientConfig(t))
require.NoError(t, err)
_, ok1 := state.Groups["1"].Tablets[x.GalaxyAttr("name1")]
Expand All @@ -120,6 +119,7 @@ func setupTablets(t *testing.T, dg *dgo.Dgraph) {
moveOk = true
break
}
time.Sleep(1 * time.Second)
}
require.True(t, moveOk)
}
Expand Down
2 changes: 1 addition & 1 deletion systest/backup/minio/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func TestBackupMinio(t *testing.T) {
t.Log("Pausing to let zero move tablet...")
moveOk := false
for retry := 5; retry > 0; retry-- {
time.Sleep(3 * time.Second)
state, err := testutil.GetStateHttps(testutil.GetAlphaClientConfig(t))
require.NoError(t, err)
if _, ok := state.Groups["1"].Tablets[x.NamespaceAttr(x.GalaxyNamespace, "movie")]; ok {
moveOk = true
break
}
time.Sleep(1 * time.Second)
}
require.True(t, moveOk)

Expand Down
4 changes: 2 additions & 2 deletions testutil/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"net/http"
"strings"
"testing"
Expand Down Expand Up @@ -178,8 +179,7 @@ func readSchema(pdir string, dType dataType) ([]string, error) {
defer db.Close()
values := make([]string, 0)

// Predicates and types in the schema are written with timestamp 1.
txn := db.NewTransactionAt(1, false)
txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions)
defer itr.Close()
Expand Down
2 changes: 1 addition & 1 deletion worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.BackupR
return nil, err
}

closer, err := g.Node.startTask(opBackup)
closer, err := g.Node.startTaskAtTs(opBackup, req.ReadTs)
if err != nil {
return nil, errors.Wrapf(err, "cannot start backup operation")
}
Expand Down
3 changes: 1 addition & 2 deletions worker/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse,
}
defer tl.alloc.Release()

// Schema and types are written at Ts=1.
txn := pr.DB.NewTransactionAt(1, false)
txn := pr.DB.NewTransactionAt(pr.Request.ReadTs, false)
defer txn.Discard()
// We don't need to iterate over all versions.
iopts := badger.DefaultIteratorOptions
Expand Down
Loading

0 comments on commit 4165f3d

Please sign in to comment.