Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and update metrics in badger #1948

Merged
merged 23 commits into from
Jul 18, 2023
Merged
2 changes: 2 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ func (db *DB) get(key []byte) (y.ValueStruct, error) {
}
// Found the required version of the key, return immediately.
if vs.Version == version {
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
return vs, nil
}
if maxVs.Version < vs.Version {
Expand Down Expand Up @@ -862,6 +863,7 @@ func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
size += e.estimateSizeAndSetThreshold(db.valueThreshold())
count++
}
y.NumBytesWrittenUserAdd(db.opt.MetricsEnabled, int64(size))
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
return nil, ErrTxnTooBig
}
Expand Down
2 changes: 2 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
panic(ErrDBClosed)
}

y.NumIteratorsCreatedAdd(txn.db.opt.MetricsEnabled, 1)

// Keep track of the number of active iterators.
txn.numIterators.Add(1)

Expand Down
24 changes: 20 additions & 4 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,18 @@ func (s *levelsController) runCompactDef(id, l int, cd compactDef) (err error) {
return err
}

getSizes := func(tables []*table.Table) int64 {
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
size := int64(0)
for _, i := range tables {
size += i.Size()
}
return size
}

sizeNewTables := getSizes(newTables)
sizeOldTables := getSizes(cd.bot) + getSizes(cd.top)
y.NumBytesCompactionWrittenAdd(s.kv.opt.MetricsEnabled, nextLevel.strLevel, sizeNewTables)

// See comment earlier in this function about the ordering of these ops, and the order in which
// we access levels when reading.
if err := nextLevel.replaceTables(cd.bot, newTables); err != nil {
Expand All @@ -1459,16 +1471,16 @@ func (s *levelsController) runCompactDef(id, l int, cd compactDef) (err error) {
expensive = " [E]"
}
s.kv.opt.Infof("[%d]%s LOG Compact %d->%d (%d, %d -> %d tables with %d splits)."+
" [%s] -> [%s], took %v\n",
" [%s] -> [%s], took %v\n, deleted %d bytes",
id, expensive, thisLevel.level, nextLevel.level, len(cd.top), len(cd.bot),
len(newTables), len(cd.splits), strings.Join(from, " "), strings.Join(to, " "),
dur.Round(time.Millisecond))
dur.Round(time.Millisecond), sizeOldTables-sizeNewTables)
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
}

if cd.thisLevel.level != 0 && len(newTables) > 2*s.kv.opt.LevelSizeMultiplier {
s.kv.opt.Debugf("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
s.kv.opt.Infof("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
len(cd.top), hex.Dump(cd.thisRange.left), hex.Dump(cd.thisRange.right))
s.kv.opt.Debugf("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
s.kv.opt.Infof("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
len(cd.bot), hex.Dump(cd.nextRange.left), hex.Dump(cd.nextRange.right))
}
return nil
Expand Down Expand Up @@ -1598,13 +1610,17 @@ func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int)
if vs.Value == nil && vs.Meta == 0 {
continue
}
y.NumBytesReadsLSMAdd(s.kv.opt.MetricsEnabled, int64(len(vs.Value)))
if vs.Version == version {
return vs, nil
}
if maxVs.Version < vs.Version {
maxVs = vs
}
}
if len(maxVs.Value) > 0 {
y.NumGetsWithResultsAdd(s.kv.opt.MetricsEnabled, 1)
}
return maxVs, nil
}

Expand Down
5 changes: 1 addition & 4 deletions memtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (mt *memTable) Put(key []byte, value y.ValueStruct) error {
if ts := y.ParseTs(entry.Key); ts > mt.maxVersion {
mt.maxVersion = ts
}
y.NumBytesWrittenToL0Add(mt.opt.MetricsEnabled, entry.estimateSizeAndSetThreshold(mt.opt.ValueThreshold))
return nil
}

Expand Down Expand Up @@ -388,7 +389,6 @@ func (lf *logFile) encryptionEnabled() bool {

// Acquire lock on mmap/file if you are calling this
func (lf *logFile) read(p valuePointer) (buf []byte, err error) {
var nbr int64
offset := p.Offset
// Do not convert size to uint32, because the lf.Data can be of size
// 4GB, which overflows the uint32 during conversion to make the size 0,
Expand All @@ -404,10 +404,7 @@ func (lf *logFile) read(p valuePointer) (buf []byte, err error) {
err = y.ErrEOF
} else {
buf = lf.Data[offset : offset+valsz]
nbr = int64(valsz)
}
y.NumReadsAdd(lf.opt.MetricsEnabled, 1)
y.NumBytesReadAdd(lf.opt.MetricsEnabled, nbr)
return buf, err
}

Expand Down
203 changes: 203 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package badger

import (
"expvar"
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func TestWriteMetrics(t *testing.T) {
opt := getTestOptions("")
opt.managedTxns = true
opt.CompactL0OnClose = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
ClearAllMetrics()
num := 10
val := make([]byte, 1<<12)
key := make([]byte, 40)
for i := 0; i < num; i++ {
_, err := rand.Read(key)
require.NoError(t, err)
_, err = rand.Read(val)
require.NoError(t, err)

writer := db.NewManagedWriteBatch()
require.NoError(t, writer.SetEntryAt(NewEntry(key, val), 1))
writer.Flush()
}

expectedSize := int64(len(val)) + 48 + 2 // 48 := size of key (40 + 8(ts)), 2 := meta
write_metric := expvar.Get("badger_v4_write_user")
require.Equal(t, expectedSize*int64(num), write_metric.(*expvar.Int).Value())

put_metric := expvar.Get("badger_v4_puts_total")
require.Equal(t, int64(num), put_metric.(*expvar.Int).Value())

lsm_metric := expvar.Get("badger_v4_written_to_l0")
require.Equal(t, expectedSize*int64(num), lsm_metric.(*expvar.Int).Value())

compactionMetric := expvar.Get("badger_v4_compaction_written_bytes").(*expvar.Map)
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
require.Equal(t, nil, compactionMetric.Get("l6"))

// Force compaction
db.Close()

_, err := OpenManaged(opt)
require.Nil(t, err)

compactionMetric = expvar.Get("badger_v4_compaction_written_bytes").(*expvar.Map)
require.GreaterOrEqual(t, expectedSize*int64(num)+int64(num*200), compactionMetric.Get("l6").(*expvar.Int).Value())
// Because we have random values, compression is not able to do much, so we incur a cost on total size
})
}

func TestVlogMetris(t *testing.T) {
opt := getTestOptions("")
opt.managedTxns = true
opt.CompactL0OnClose = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
ClearAllMetrics()
num := 10
val := make([]byte, 1<<20) // Large Value
key := make([]byte, 40)
for i := 0; i < num; i++ {
_, err := rand.Read(key)
require.NoError(t, err)
_, err = rand.Read(val)
require.NoError(t, err)

writer := db.NewManagedWriteBatch()
require.NoError(t, writer.SetEntryAt(NewEntry(key, val), 1))
writer.Flush()
}

expectedSize := int64(len(val)) + 200 // vlog expected size

totalWrites := expvar.Get("badger_v4_disk_writes_total")
require.Equal(t, int64(num), totalWrites.(*expvar.Int).Value())

bytesWritten := expvar.Get("badger_v4_vlog_written_bytes")
require.GreaterOrEqual(t, expectedSize*int64(num), bytesWritten.(*expvar.Int).Value())

txn := db.NewTransactionAt(2, false)
item, err := txn.Get(key)
require.NoError(t, err)
require.Equal(t, uint64(1), item.Version())

err = item.Value(func(val []byte) error {
totalReads := expvar.Get("badger_v4_disk_reads_total")
bytesRead := expvar.Get("badger_v4_read_bytes_vlog")
require.Equal(t, int64(1), totalReads.(*expvar.Int).Value())
require.GreaterOrEqual(t, expectedSize, bytesRead.(*expvar.Int).Value())
return nil
})

require.Nil(t, err)
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
})
}

func ClearAllMetrics() {
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
expvar.Do(func(kv expvar.KeyValue) {
// Reset the value of each expvar variable based on its type
switch v := kv.Value.(type) {
case *expvar.Int:
v.Set(0)
case *expvar.Float:
v.Set(0)
case *expvar.Map:
v.Init()
case *expvar.String:
v.Set("")
}
})
}

func TestReadMetrics(t *testing.T) {
opt := getTestOptions("")
opt.managedTxns = true
opt.CompactL0OnClose = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
ClearAllMetrics()
num := 10
val := make([]byte, 1<<15)
keys := [][]byte{}
writer := db.NewManagedWriteBatch()
for i := 0; i < num; i++ {
keyB := key("byte", 1)
keys = append(keys, []byte(keyB))

_, err := rand.Read(val)
require.NoError(t, err)

require.NoError(t, writer.SetEntryAt(NewEntry([]byte(keyB), val), 1))

harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
}
writer.Flush()

txn := db.NewTransactionAt(2, false)
item, err := txn.Get(keys[0])
require.NoError(t, err)

totalGets := expvar.Get("badger_v4_gets_total")
require.Equal(t, int64(1), totalGets.(*expvar.Int).Value())

totalMemtableReads := expvar.Get("badger_v4_memtable_gets_total")
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
require.Equal(t, int64(1), totalMemtableReads.(*expvar.Int).Value())

totalLSMGets := expvar.Get("badger_v4_lsm_level_gets_total")
require.Nil(t, totalLSMGets.(*expvar.Map).Get("l6"))

// Force compaction
db.Close()

db, err = OpenManaged(opt)
require.Nil(t, err)

txn = db.NewTransactionAt(2, false)
item, err = txn.Get(keys[0])
require.NoError(t, err)
require.Equal(t, uint64(1), item.Version())

_, err = txn.Get([]byte(key("abdbyte", 1000))) // val should be far enough that bloom filter doesn't hit
require.NotNil(t, err)
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved

totalLSMGets = expvar.Get("badger_v4_lsm_level_gets_total")
require.Equal(t, int64(0x1), totalLSMGets.(*expvar.Map).Get("l6").(*expvar.Int).Value())

totalBloom := expvar.Get("badger_v4_lsm_bloom_hits_total")
require.Equal(t, int64(0x1), totalBloom.(*expvar.Map).Get("l6").(*expvar.Int).Value())
require.Equal(t, int64(0x1), totalBloom.(*expvar.Map).Get("DoesNotHave_HIT").(*expvar.Int).Value())
require.Equal(t, int64(0x2), totalBloom.(*expvar.Map).Get("DoesNotHave_ALL").(*expvar.Int).Value())

bytesLSM := expvar.Get("badger_v4_read_bytes_lsm")
require.Equal(t, int64(len(val)), bytesLSM.(*expvar.Int).Value())

getWithResult := expvar.Get("badger_v4_get_results")
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
require.Equal(t, int64(2), getWithResult.(*expvar.Int).Value())

iterOpts := DefaultIteratorOptions
iter := txn.NewKeyIterator(keys[0], iterOpts)
iter.Seek(keys[0])

rangeQueries := expvar.Get("badger_v4_iterators")
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
require.Equal(t, int64(1), rangeQueries.(*expvar.Int).Value())
})
}
6 changes: 4 additions & 2 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,8 @@ func (vlog *valueLog) write(reqs []*request) error {
bytesWritten += buf.Len()
// No need to flush anything, we write to file directly via mmap.
}
y.NumWritesAdd(vlog.opt.MetricsEnabled, int64(written))
y.NumBytesWrittenAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))
y.NumWritesVlogAdd(vlog.opt.MetricsEnabled, int64(written))
y.NumBytesWrittenVlogAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))

vlog.numEntriesWritten += uint32(written)
vlog.db.threshold.update(valueSizes)
Expand Down Expand Up @@ -991,6 +991,8 @@ func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error)
}

buf, err := lf.read(vp)
y.NumReadsVlogAdd(vlog.db.opt.MetricsEnabled, 1)
y.NumBytesReadsVlogAdd(vlog.db.opt.MetricsEnabled, int64(len(buf)))
return buf, lf, err
}

Expand Down
Loading