Skip to content

Commit

Permalink
fix(subscriptions): fix subscription to use the kv with the max versi…
Browse files Browse the repository at this point in the history
…on (#7349)

Issue:
The last KV from the list received from subscription need not be the latest update. This is because of the KVs written at the top due to value log rewrites or due to rollups.
We were using the last KV from the updates received via subscription.

Fix:
Iterate over the KVs to get the KV with the latest version.

(cherry picked from commit 4a2bc36)
  • Loading branch information
NamanJain8 committed Jan 22, 2021
1 parent a3eff77 commit 78544cf
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 7 deletions.
5 changes: 2 additions & 3 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,9 +808,8 @@ func listenForCorsUpdate(closer *z.Closer) {
// Remove uid from the key, to get the correct prefix
prefix = prefix[:len(prefix)-8]
worker.SubscribeForUpdates([][]byte{prefix}, func(kvs *badgerpb.KVList) {
// Last update contains the latest value. So, taking the last update.
lastIdx := len(kvs.GetKv()) - 1
kv := kvs.GetKv()[lastIdx]

kv := x.KvWithMaxVersion(kvs, [][]byte{prefix}, "CORS Subscription")
glog.Infof("Updating cors from subscription.")
// Unmarshal the incoming posting list.
pl := &pb.PostingList{}
Expand Down
3 changes: 2 additions & 1 deletion edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ func RefreshAcls(closer *z.Closer) {
if kvs == nil || len(kvs.Kv) == 0 {
return
}
if err := retrieveAcls(kvs.Kv[0].Version); err != nil {
kv := x.KvWithMaxVersion(kvs, aclPrefixes, "ACL Subscription")
if err := retrieveAcls(kv.GetVersion()); err != nil {
glog.Errorf("Error while retrieving acls: %v", err)
}
}, 1, closer)
Expand Down
4 changes: 1 addition & 3 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,8 @@ func newAdminResolver(
prefix = prefix[:len(prefix)-8]
// Listen for graphql schema changes in group 1.
go worker.SubscribeForUpdates([][]byte{prefix}, func(kvs *badgerpb.KVList) {
// Last update contains the latest value. So, taking the last update.
lastIdx := len(kvs.GetKv()) - 1
kv := kvs.GetKv()[lastIdx]

kv := x.KvWithMaxVersion(kvs, [][]byte{prefix}, "GraphQL Schema Subscription")
glog.Infof("Updating GraphQL schema from subscription.")

// Unmarshal the incoming posting list.
Expand Down
28 changes: 28 additions & 0 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

"github.com/dgraph-io/badger/v3"
bo "github.com/dgraph-io/badger/v3/options"
badgerpb "github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/dgo/v200"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/ristretto/z"
Expand Down Expand Up @@ -1202,3 +1203,30 @@ func ToHex(i uint64, rdf bool) []byte {

return out
}

// KvWithMaxVersion returns a KV with the max version from the list of KVs.
func KvWithMaxVersion(kvs *badgerpb.KVList, prefixes [][]byte, tag string) *badgerpb.KV {
hasAnyPrefix := func(key []byte) bool {
for _, prefix := range prefixes {
if bytes.HasPrefix(key, prefix) {
return true
}
}
return false
}
// Iterate over kvs to get the KV with the latest version. It is not necessary that the last
// KV contain the latest value.
var maxKv *badgerpb.KV
for _, kv := range kvs.GetKv() {
if !hasAnyPrefix(kv.GetKey()) {
// Verify that we got the key which was subscribed. This shouldn't happen, but added for
// robustness.
glog.Errorf("[%s] Got key: %x which was not subscribed", tag, kv.GetKey())
continue
}
if maxKv.GetVersion() <= kv.GetVersion() {
maxKv = kv
}
}
return maxKv
}

0 comments on commit 78544cf

Please sign in to comment.