Skip to content

Commit

Permalink
fix restore and incr restore
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaji-kharse committed Apr 30, 2024
1 parent 8c4ada1 commit 7280684
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 18 deletions.
118 changes: 100 additions & 18 deletions systest/vector/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package main
import (
"context"
"fmt"
"slices"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -75,13 +76,6 @@ func TestVectorIncrBackupRestore(t *testing.T) {
incrFrom := i - 1
require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", incrFrom, i))
require.NoError(t, dgraphtest.WaitForRestore(c))

// rebuild index
require.NoError(t, gc.SetupSchema(testSchemaWithoutIndex))
time.Sleep(2 * time.Minute)
require.NoError(t, gc.SetupSchema(testSchema))
time.Sleep(2 * time.Minute)

query := `{
vector(func: has(project_discription_v)) {
count(uid)
Expand All @@ -93,9 +87,9 @@ func TestVectorIncrBackupRestore(t *testing.T) {

if i == 0 {
require.JSONEq(t, fmt.Sprintf(`{"vector":[{"count":%v}]}`, numVectors*5), string(result.GetJson()))
var a [][]float32
var allSpredVec [][]float32
for _, vecArr := range allVectors {
a = append(a, vecArr...)
allSpredVec = append(allSpredVec, vecArr...)
}

for p, vector := range allVectors[i] {
Expand All @@ -110,12 +104,18 @@ func TestVectorIncrBackupRestore(t *testing.T) {
require.Equal(t, numVectors, len(similarVectors))

for _, similarVector := range similarVectors {
require.Contains(t, a, similarVector)
require.Contains(t, allSpredVec, similarVector)
}
}

} else {
require.JSONEq(t, fmt.Sprintf(`{"vector":[{"count":%v}]}`, numVectors), string(result.GetJson()))
require.JSONEq(t, fmt.Sprintf(`{"vector":[{"count":%v}]}`, numVectors*i), string(result.GetJson()))
var allSpredVec [][]float32
for i, vecArr := range allVectors {
if i <= i {
allSpredVec = append(allSpredVec, vecArr...)
}
}
for p, vector := range allVectors[i-1] {
triple := strings.Split(allRdfs[i-1], "\n")[p]
uid := strings.Split(triple, " ")[0]
Expand All @@ -126,10 +126,9 @@ func TestVectorIncrBackupRestore(t *testing.T) {

similarVectors, err := gc.QueryMultipleVectorsUsingSimilarTo(vector, pred, numVectors)
require.NoError(t, err)
require.Equal(t, numVectors, len(similarVectors))

require.GreaterOrEqual(t, len(similarVectors), 10)
for _, similarVector := range similarVectors {
require.Contains(t, allVectors[i-1], similarVector)
require.Contains(t, allSpredVec, similarVector)
}
}
}
Expand Down Expand Up @@ -171,11 +170,94 @@ func TestVectorBackupRestore(t *testing.T) {
require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", 0, 0))
require.NoError(t, dgraphtest.WaitForRestore(c))

// rebuild index
testVectorQuery(t, gc, vectors, rdfs, pred, numVectors)
}

func TestVectorBackupRestoreDropIndex(t *testing.T) {
// setup cluster
conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour)
c, err := dgraphtest.NewLocalCluster(conf)
require.NoError(t, err)
defer func() { c.Cleanup(t.Failed()) }()
require.NoError(t, c.Start())

gc, cleanup, err := c.Client()
require.NoError(t, err)
defer cleanup()
require.NoError(t, gc.LoginIntoNamespace(context.Background(),
dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace))

hc, err := c.HTTPClient()
require.NoError(t, err)
require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser,
dgraphtest.DefaultPassword, x.GalaxyNamespace))

// add vector predicate + index
require.NoError(t, gc.SetupSchema(testSchema))
// add data to the vector predicate
numVectors := 3
pred := "project_discription_v"
rdfs, vectors := dgraphtest.GenerateRandomVectors(0, numVectors, 1, pred)
mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)

t.Log("taking full backup \n")
require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir))

// drop index
require.NoError(t, gc.SetupSchema(testSchemaWithoutIndex))
time.Sleep(2 * time.Minute)

// add more data to the vector predicate
rdfs, vectors2 := dgraphtest.GenerateRandomVectors(3, numVectors+3, 1, pred)
mu = &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)

// delete some entries
mu = &api.Mutation{DelNquads: []byte(strings.Split(rdfs, "\n")[1]), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)

vectors2 = slices.Delete(vectors2, 1, 2)

mu = &api.Mutation{DelNquads: []byte(strings.Split(rdfs, "\n")[0]), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)
vectors2 = slices.Delete(vectors2, 0, 1)

t.Log("taking first incr backup \n")
require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir))

// add index
require.NoError(t, gc.SetupSchema(testSchema))
time.Sleep(2 * time.Minute)

testVectorQuery(t, gc, vectors, rdfs, pred, numVectors)
t.Log("taking second incr backup \n")
require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir))

// restore backup
t.Log("restoring backup \n")
require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", 0, 0))
require.NoError(t, dgraphtest.WaitForRestore(c))

query := ` {
vectors(func: has(project_discription_v)) {
count(uid)
}
}`
resp, err := gc.Query(query)
require.NoError(t, err)
require.JSONEq(t, `{"vectors":[{"count":4}]}`, string(resp.GetJson()))

require.NoError(t, err)
allVec := append(vectors, vectors2...)

for _, vector := range allVec {

similarVectors, err := gc.QueryMultipleVectorsUsingSimilarTo(vector, pred, 4)
require.NoError(t, err)
for _, similarVector := range similarVectors {
require.Contains(t, allVec, similarVector)
}
}
}
23 changes: 23 additions & 0 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/tok/hnsw"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)
Expand Down Expand Up @@ -194,6 +195,28 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error {
for pred := range group.Tablets {
predMap[gid] = append(predMap[gid], pred)
}

}

// see if any of the predicates are vector predicates and add the supporting
// vector predicates to the backup request.
vecPredMap := make(map[uint32][]string)
for gid, preds := range predMap {
schema, err := GetSchemaOverNetwork(ctx, &pb.SchemaRequest{Predicates: preds})
if err != nil {
return err
}

for _, pred := range schema {
if pred.Type == "float32vector" && len(pred.IndexSpecs) != 0 {
vecPredMap[gid] = append(predMap[gid], pred.Predicate+hnsw.VecEntry, pred.Predicate+hnsw.VecKeyword,
pred.Predicate+hnsw.VecDead)
}
}
}

for gid, preds := range vecPredMap {
predMap[gid] = append(predMap[gid], preds...)
}

glog.Infof(
Expand Down
17 changes: 17 additions & 0 deletions worker/restore_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/tok/hnsw"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)
Expand Down Expand Up @@ -470,6 +472,15 @@ func (m *mapper) processReqCh(ctx context.Context) error {
}
return nil
}

isVectorSupportingPred := func(pred string) bool {
if strings.HasSuffix(pred, hnsw.VecEntry) || strings.HasSuffix(pred, hnsw.VecKeyword) ||
strings.HasSuffix(pred, hnsw.VecDead) {
return true
}
return false
}

// We changed the format of predicate in 2103 and 2105. SchemaUpdate and TypeUpdate have
// predicate stored within them, so they also need to be updated accordingly.
switch in.version {
Expand All @@ -488,6 +499,12 @@ func (m *mapper) processReqCh(ctx context.Context) error {
default:
// for manifest versions >= 2015, do nothing.
}

// If the predicate is a vector supporting predicate, skip further processing.
// currently we don't store vector supporting predicates in the schema.
if isVectorSupportingPred(parsedKey.Attr) {
return nil
}
// Reset the StreamId to prevent ordering issues while writing to stream writer.
kv.StreamId = 0
// Schema and type keys are not stored in an intermediate format so their
Expand Down

0 comments on commit 7280684

Please sign in to comment.