Skip to content

Commit

Permalink
fix(core): add the posting list risteretto cache back
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel committed Oct 3, 2024
1 parent 345c438 commit 98bb094
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
30 changes: 30 additions & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ package posting

import (
"bytes"
"context"
"fmt"
"sync"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgo/v240/protos/api"
"github.com/dgraph-io/dgraph/v24/protos/pb"
"github.com/dgraph-io/dgraph/v24/tok/index"
"github.com/dgraph-io/dgraph/v24/x"
"github.com/dgraph-io/ristretto"
"github.com/dgraph-io/ristretto/z"
ostats "go.opencensus.io/stats"
)

const (
Expand All @@ -36,6 +40,7 @@ const (
var (
pstore *badger.DB
closer *z.Closer
lCache *ristretto.Cache[[]byte, *List]
)

// Init initializes the posting lists package, the in memory and dirty list hash.
Expand All @@ -45,6 +50,31 @@ func Init(ps *badger.DB, cacheSize int64) {
go x.MonitorMemoryMetrics(closer)

// Initialize cache.
if cacheSize == 0 {
return
}

var err error
lCache, err = ristretto.NewCache[[]byte, *List](&ristretto.Config[[]byte, *List]{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
Cost: func(val *List) int64 {
return 0
},
})
x.Check(err)
go func() {
m := lCache.Metrics
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
// Record the posting list cache hit ratio
ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio()))
}
}()
}

func UpdateMaxCost(maxCost int64) {
Expand Down
43 changes: 43 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
return err
}

RemoveCacheFor(key)

globalCache.Lock()
val, ok := globalCache.items[string(key)]
if ok {
Expand Down Expand Up @@ -344,11 +346,22 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
}

func ResetCache() {
if lCache != nil {
lCache.Clear()
}
globalCache.Lock()
globalCache.items = make(map[string]*CachePL)
globalCache.Unlock()
}

// RemoveCacheFor will delete the list corresponding to the given key.
func RemoveCacheFor(key []byte) {
// TODO: investigate if this can be done by calling Set with a nil value.
if lCache != nil {
lCache.Del(key)
}
}

func NewCachePL() *CachePL {
return &CachePL{
count: 0,
Expand Down Expand Up @@ -537,7 +550,33 @@ func ShouldGoInCache(pk x.ParsedKey) bool {
return (!pk.IsData() && strings.HasSuffix(pk.Attr, "dgraph.type"))
}

func PostingListCacheEnabled() bool {
return lCache != nil
}

func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
if PostingListCacheEnabled() {
l, ok := lCache.Get(key)
if ok && l != nil {
// No need to clone the immutable layer or the key since mutations will not modify it.
lCopy := &List{
minTs: l.minTs,
maxTs: l.maxTs,
key: key,
plist: l.plist,
}
l.RLock()
if l.mutationMap != nil {
lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap))
for ts, pl := range l.mutationMap {
lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList)
}
}
l.RUnlock()
return lCopy, nil
}
}

if pstore.IsClosed() {
return nil, badger.ErrDBClosed
}
Expand Down Expand Up @@ -604,5 +643,9 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
globalCache.Unlock()
}

if PostingListCacheEnabled() {
lCache.Set(key, l, 0)
}

return l, nil
}

0 comments on commit 98bb094

Please sign in to comment.