Skip to content

Commit

Permalink
collect rolling stats
Browse files Browse the repository at this point in the history
  • Loading branch information
NamanJain8 committed Jan 7, 2021
1 parent 822623e commit dacdc19
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 5 deletions.
82 changes: 81 additions & 1 deletion posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"bytes"
"context"
"encoding/hex"
"fmt"
"log"
"math"
"sort"
"sync"
"time"

"github.com/dgryski/go-farm"
"github.com/pkg/errors"
Expand All @@ -38,6 +41,7 @@ import (
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
)

Expand All @@ -53,6 +57,83 @@ var (
maxListSize = mb / 2
)

type Stat struct {
start time.Time
updated time.Time
added time.Time
count int
}

type Rstats struct {
sync.RWMutex
keys map[string]*Stat
}

var rstat Rstats

func (rstat *Rstats) Update(key []byte) {
rstat.Lock()
defer rstat.Unlock()
nkey := fmt.Sprintf("%x", key)
stat, ok := rstat.keys[nkey]
if !ok {
stat = &Stat{
start: time.Now(),
}
rstat.keys[nkey] = stat
}
stat.updated = time.Now()
stat.count++
}

func (rstat *Rstats) Add(key []byte) {
rstat.Lock()
defer rstat.Unlock()
nkey := fmt.Sprintf("%x", key)
stat, _ := rstat.keys[nkey]
stat.added = time.Now()
}

func (rstat *Rstats) Delete(key []byte) {
rstat.Lock()
defer rstat.Unlock()
nkey := fmt.Sprintf("%x", key)
delete(rstat.keys, nkey)
}

func PrintRstats(lc *z.Closer) {
defer lc.Done()
timer := time.NewTicker(time.Minute)
defer timer.Stop()
printstats := func() {
rstat.RLock()
defer rstat.RUnlock()

now := time.Now()
count := 0
for key, s := range rstat.keys {
count++
start := s.start
if now.Sub(start) > 10*time.Minute {
glog.Infof("key: %s cnt: %d [u: %s, a: %s]", key, s.count,
x.FixedDuration(s.updated.Sub(start)), x.FixedDuration(s.added.Sub(start)))
}
// Print atmax 100 entries
if count >= 100 {
break
}
}
}
for {
select {
case <-lc.HasBeenClosed():
return
case <-timer.C:
printstats()
}
}
}

const (
// Set means overwrite in mutation layer. It contributes 0 in Length.
Set uint32 = 0x01
Expand Down Expand Up @@ -329,7 +410,6 @@ func hasDeleteAll(mpost *pb.Posting) bool {
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error {
l.AssertLock()
x.AssertTrue(mpost.Op == Set || mpost.Op == Del)

// Keys are added to the rollup batches here instead of at the point at which the
// transaction is committed because the transaction context does not keep track
// of the badger keys touched by mutations. It's useful to roll up lists even if
Expand Down
5 changes: 3 additions & 2 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ var (
// Init initializes the posting lists package, the in memory and dirty list hash.
func Init(ps *badger.DB, cacheSize int64) {
pstore = ps
closer = z.NewCloser(1)
closer = z.NewCloser(2)
go x.MonitorMemoryMetrics(closer)
go PrintRstats(closer)
// Initialize cache.
if cacheSize == 0 {
return
Expand Down Expand Up @@ -154,7 +155,7 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List {
}

func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) {
getNewPlistNil := func() (*List, error){
getNewPlistNil := func() (*List, error) {
lc.RLock()
defer lc.RUnlock()
if lc.plists == nil {
Expand Down
8 changes: 6 additions & 2 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
}

func (ir *incrRollupi) addKeyToBatch(key []byte) {
rstat.Add(key)
batch := ir.keysPool.Get().(*[][]byte)
*batch = append(*batch, key)
if len(*batch) < 16 {
Expand Down Expand Up @@ -126,6 +127,7 @@ func (ir *incrRollupi) Process(closer *z.Closer) {
doRollup := func() {
currTs := time.Now().Unix()
for _, key := range *batch {
rstat.Delete(key)
hash := z.MemHash(key)
if elem := m[hash]; currTs-elem >= 10 {
// Key not present or Key present but last roll up was more than 10 sec ago.
Expand All @@ -135,8 +137,9 @@ func (ir *incrRollupi) Process(closer *z.Closer) {
if err := ir.rollUpKey(writer, key); err != nil {
glog.Warningf("Error %v rolling up key %v\n", err, key)
}
} else {
glog.Infof("== [DEBUG] NOT Rolling up key %x", key)
}
glog.Infof("== [DEBUG] NOT Rolling up key %x", key)
}
// clear the batch and put it back in Sync keysPool
*batch = (*batch)[:0]
Expand Down Expand Up @@ -245,7 +248,8 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
// not output anything here.
continue
}
glog.Infof("== [DEBUG] writing key %x (%d)", key, commitTs)
rstat.Update([]byte(key))
// glog.Infof("== [DEBUG] writing key %x (%d)", key, commitTs)
err := btxn.SetEntry(&badger.Entry{
Key: []byte(key),
Value: data,
Expand Down

0 comments on commit dacdc19

Please sign in to comment.