From dacdc19f0e80262339b09c7b412f97071ab518db Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Fri, 8 Jan 2021 01:39:49 +0530 Subject: [PATCH] collect rolling stats --- posting/list.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++- posting/lists.go | 5 +-- posting/mvcc.go | 8 +++-- 3 files changed, 90 insertions(+), 5 deletions(-) diff --git a/posting/list.go b/posting/list.go index 90881dd5b6f..d37fd965045 100644 --- a/posting/list.go +++ b/posting/list.go @@ -20,9 +20,12 @@ import ( "bytes" "context" "encoding/hex" + "fmt" "log" "math" "sort" + "sync" + "time" "github.com/dgryski/go-farm" "github.com/pkg/errors" @@ -38,6 +41,7 @@ import ( "github.com/dgraph-io/dgraph/types/facets" "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/ristretto/z" + "github.com/golang/glog" "github.com/golang/protobuf/proto" ) @@ -53,6 +57,83 @@ var ( maxListSize = mb / 2 ) +type Stat struct { + start time.Time + updated time.Time + added time.Time + count int +} + +type Rstats struct { + sync.RWMutex + keys map[string]*Stat +} + +var rstat Rstats + +func (rstat *Rstats) Update(key []byte) { + rstat.Lock() + defer rstat.Unlock() + nkey := fmt.Sprintf("%x", key) + stat, ok := rstat.keys[nkey] + if !ok { + stat = &Stat{ + start: time.Now(), + } + rstat.keys[nkey] = stat + } + stat.updated = time.Now() + stat.count++ +} + +func (rstat *Rstats) Add(key []byte) { + rstat.Lock() + defer rstat.Unlock() + nkey := fmt.Sprintf("%x", key) + stat, _ := rstat.keys[nkey] + stat.added = time.Now() +} + +func (rstat *Rstats) Delete(key []byte) { + rstat.Lock() + defer rstat.Unlock() + nkey := fmt.Sprintf("%x", key) + delete(rstat.keys, nkey) +} + +func PrintRstats(lc *z.Closer) { + defer lc.Done() + timer := time.NewTicker(time.Minute) + defer timer.Stop() + printstats := func() { + rstat.RLock() + defer rstat.RUnlock() + + now := time.Now() + count := 0 + for key, s := range rstat.keys { + count++ + start := s.start + if now.Sub(start) > 10*time.Minute { + glog.Infof("key: %s cnt: %d [u: %s, a: %s]", key, s.count, + x.FixedDuration(s.updated.Sub(start)), x.FixedDuration(s.added.Sub(start))) + } + // Print atmax 100 entries + if count >= 100 { + break + } + } + } + for { + select { + case <-lc.HasBeenClosed(): + return + case <-timer.C: + printstats() + } + } +} + const ( // Set means overwrite in mutation layer. It contributes 0 in Length. Set uint32 = 0x01 @@ -329,7 +410,6 @@ func hasDeleteAll(mpost *pb.Posting) bool { func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error { l.AssertLock() x.AssertTrue(mpost.Op == Set || mpost.Op == Del) - // Keys are added to the rollup batches here instead of at the point at which the // transaction is committed because the transaction context does not keep track // of the badger keys touched by mutations. It's useful to roll up lists even if diff --git a/posting/lists.go b/posting/lists.go index 8e78be45656..66f895eb16c 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -46,8 +46,9 @@ var ( // Init initializes the posting lists package, the in memory and dirty list hash. func Init(ps *badger.DB, cacheSize int64) { pstore = ps - closer = z.NewCloser(1) + closer = z.NewCloser(2) go x.MonitorMemoryMetrics(closer) + go PrintRstats(closer) // Initialize cache. if cacheSize == 0 { return @@ -154,7 +155,7 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List { } func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) { - getNewPlistNil := func() (*List, error){ + getNewPlistNil := func() (*List, error) { lc.RLock() defer lc.RUnlock() if lc.plists == nil { diff --git a/posting/mvcc.go b/posting/mvcc.go index 76bc58c0b88..ad79b9eebb8 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -87,6 +87,7 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error { } func (ir *incrRollupi) addKeyToBatch(key []byte) { + rstat.Add(key) batch := ir.keysPool.Get().(*[][]byte) *batch = append(*batch, key) if len(*batch) < 16 { @@ -126,6 +127,7 @@ func (ir *incrRollupi) Process(closer *z.Closer) { doRollup := func() { currTs := time.Now().Unix() for _, key := range *batch { + rstat.Delete(key) hash := z.MemHash(key) if elem := m[hash]; currTs-elem >= 10 { // Key not present or Key present but last roll up was more than 10 sec ago. @@ -135,8 +137,9 @@ func (ir *incrRollupi) Process(closer *z.Closer) { if err := ir.rollUpKey(writer, key); err != nil { glog.Warningf("Error %v rolling up key %v\n", err, key) } + } else { + glog.Infof("== [DEBUG] NOT Rolling up key %x", key) } - glog.Infof("== [DEBUG] NOT Rolling up key %x", key) } // clear the batch and put it back in Sync keysPool *batch = (*batch)[:0] @@ -245,7 +248,8 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { // not output anything here. continue } - glog.Infof("== [DEBUG] writing key %x (%d)", key, commitTs) + rstat.Update([]byte(key)) + // glog.Infof("== [DEBUG] writing key %x (%d)", key, commitTs) err := btxn.SetEntry(&badger.Entry{ Key: []byte(key), Value: data,