diff --git a/posting/lists.go b/posting/lists.go index a232cc5aca4..9a20a917900 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -18,15 +18,19 @@ package posting import ( "bytes" + "context" "fmt" "sync" + "time" "github.com/dgraph-io/badger/v4" "github.com/dgraph-io/dgo/v240/protos/api" "github.com/dgraph-io/dgraph/v24/protos/pb" "github.com/dgraph-io/dgraph/v24/tok/index" "github.com/dgraph-io/dgraph/v24/x" + "github.com/dgraph-io/ristretto" "github.com/dgraph-io/ristretto/z" + ostats "go.opencensus.io/stats" ) const ( @@ -36,6 +40,7 @@ const ( var ( pstore *badger.DB closer *z.Closer + lCache *ristretto.Cache[[]byte, *List] ) // Init initializes the posting lists package, the in memory and dirty list hash. @@ -45,6 +50,31 @@ func Init(ps *badger.DB, cacheSize int64) { go x.MonitorMemoryMetrics(closer) // Initialize cache. + if cacheSize == 0 { + return + } + + var err error + lCache, err = ristretto.NewCache[[]byte, *List](&ristretto.Config[[]byte, *List]{ + // Use 5% of cache memory for storing counters. + NumCounters: int64(float64(cacheSize) * 0.05 * 2), + MaxCost: int64(float64(cacheSize) * 0.95), + BufferItems: 64, + Metrics: true, + Cost: func(val *List) int64 { + return 0 + }, + }) + x.Check(err) + go func() { + m := lCache.Metrics + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for range ticker.C { + // Record the posting list cache hit ratio + ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio())) + } + }() } func UpdateMaxCost(maxCost int64) { diff --git a/posting/mvcc.go b/posting/mvcc.go index 335876ae2e7..42ef69557b8 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -133,6 +133,8 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error { return err } + RemoveCacheFor(key) + globalCache.Lock() val, ok := globalCache.items[string(key)] if ok { @@ -344,11 +346,22 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { } func ResetCache() { + if lCache != nil { + lCache.Clear() + } globalCache.Lock() globalCache.items = make(map[string]*CachePL) globalCache.Unlock() } +// RemoveCacheFor will delete the list corresponding to the given key. +func RemoveCacheFor(key []byte) { + // TODO: investigate if this can be done by calling Set with a nil value. + if lCache != nil { + lCache.Del(key) + } +} + func NewCachePL() *CachePL { return &CachePL{ count: 0, @@ -537,7 +550,33 @@ func ShouldGoInCache(pk x.ParsedKey) bool { return (!pk.IsData() && strings.HasSuffix(pk.Attr, "dgraph.type")) } +func PostingListCacheEnabled() bool { + return lCache != nil +} + func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + if PostingListCacheEnabled() { + l, ok := lCache.Get(key) + if ok && l != nil { + // No need to clone the immutable layer or the key since mutations will not modify it. + lCopy := &List{ + minTs: l.minTs, + maxTs: l.maxTs, + key: key, + plist: l.plist, + } + l.RLock() + if l.mutationMap != nil { + lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap)) + for ts, pl := range l.mutationMap { + lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList) + } + } + l.RUnlock() + return lCopy, nil + } + } + if pstore.IsClosed() { return nil, badger.ErrDBClosed } @@ -604,5 +643,9 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { globalCache.Unlock() } + if PostingListCacheEnabled() { + lCache.Set(key, l, 0) + } + return l, nil }