Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swarm/shed: add metrics to each shed db #1029

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 205 additions & 3 deletions swarm/shed/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,48 @@
package shed

import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
)

// The limit for LevelDB OpenFilesCacheCapacity.
const openFileLimit = 128
const (
openFileLimit = 128 // The limit for LevelDB OpenFilesCacheCapacity.
writePauseWarningThrottler = 1 * time.Minute
)

// DB provides abstractions over LevelDB in order to
// implement complex structures using fields and ordered indexes.
// It provides a schema functionality to store fields and indexes
// information about naming and types.
type DB struct {
ldb *leveldb.DB

compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written

quitLock sync.Mutex // Mutex protecting the quit channel access
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
}

// NewDB constructs a new DB and validates the schema
// if it exists in database on the given path.
func NewDB(path string) (db *DB, err error) {
// metricsPrefix is used for metrics collection for the given DB.
func NewDB(path string, metricsPrefix string) (db *DB, err error) {
ldb, err := leveldb.OpenFile(path, &opt.Options{
OpenFilesCacheCapacity: openFileLimit,
})
Expand All @@ -66,6 +88,9 @@ func NewDB(path string) (db *DB, err error) {
return nil, err
}
}

db.Meter(metricsPrefix)

return db, nil
}

Expand Down Expand Up @@ -126,5 +151,182 @@ func (db *DB) WriteBatch(batch *leveldb.Batch) (err error) {

// Close closes LevelDB database.
func (db *DB) Close() (err error) {
close(db.quitChan)
return db.ldb.Close()
}

// Meter configures the database metrics collectors
func (db *DB) Meter(prefix string) {
// Initialize all the metrics collector at the requested prefix
db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't *Meter fields assignments be under a lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Why would they be under a lock? Meter is called once, when you initialise the DB, why would you initialise the same DB under different threads?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Meter is exported, it can be called any number of times, this is why I suggested not to export it, in the commend bellow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, Meter should not be exported. It should not be called a number of times, but only once.

db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)

// Create a quit channel for the periodic collector and run it
db.quitLock.Lock()
db.quitChan = make(chan chan error)
db.quitLock.Unlock()

go db.meter(10 * time.Second)
}

func (db *DB) meter(refresh time.Duration) {
// Create the counters to store current and previous compaction values
compactions := make([][]float64, 2)
for i := 0; i < 2; i++ {
compactions[i] = make([]float64, 3)
}
// Create storage for iostats.
var iostats [2]float64

// Create storage and warning log tracer for write delay.
var (
delaystats [2]int64
lastWritePaused time.Time
)

var (
errc chan error
merr error
)

// Iterate ad infinitum and collect the stats
for i := 1; errc == nil && merr == nil; i++ {
// Retrieve the database stats
stats, err := db.ldb.GetProperty("leveldb.stats")
if err != nil {
log.Error("Failed to read database stats", "err", err)
merr = err
continue
}
// Find the compaction table, skip the header
lines := strings.Split(stats, "\n")
for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
lines = lines[1:]
}
if len(lines) <= 3 {
log.Error("Compaction table not found")
merr = errors.New("compaction table not found")
continue
}
lines = lines[3:]

// Iterate over all the table rows, and accumulate the entries
for j := 0; j < len(compactions[i%2]); j++ {
compactions[i%2][j] = 0
}
for _, line := range lines {
parts := strings.Split(line, "|")
if len(parts) != 6 {
break
}
for idx, counter := range parts[3:] {
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
if err != nil {
log.Error("Compaction entry parsing failed", "err", err)
merr = err
continue
}
compactions[i%2][idx] += value
}
}
// Update all the requested meters
if db.compTimeMeter != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All *Meter fields checks should be under a lock.

I have a suggestion, not to export Meter method, just to initialize metrics in NewDB, making them "immutable" from "outside" and to remove the lock completely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the problem is that Meter is exported.

db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
}
if db.compReadMeter != nil {
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
}
if db.compWriteMeter != nil {
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
}

// Retrieve the write delay statistic
writedelay, err := db.ldb.GetProperty("leveldb.writedelay")
if err != nil {
log.Error("Failed to read database write delay statistic", "err", err)
merr = err
continue
}
var (
delayN int64
delayDuration string
duration time.Duration
paused bool
)
if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil {
log.Error("Write delay statistic not found")
merr = err
continue
}
duration, err = time.ParseDuration(delayDuration)
if err != nil {
log.Error("Failed to parse delay duration", "err", err)
merr = err
continue
}
if db.writeDelayNMeter != nil {
db.writeDelayNMeter.Mark(delayN - delaystats[0])
}
if db.writeDelayMeter != nil {
db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
}
// If a warning that db is performing compaction has been displayed, any subsequent
// warnings will be withheld for one minute not to overwhelm the user.
if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 &&
time.Now().After(lastWritePaused.Add(writePauseWarningThrottler)) {
log.Warn("Database compacting, degraded performance")
lastWritePaused = time.Now()
}
delaystats[0], delaystats[1] = delayN, duration.Nanoseconds()

// Retrieve the database iostats.
ioStats, err := db.ldb.GetProperty("leveldb.iostats")
if err != nil {
log.Error("Failed to read database iostats", "err", err)
merr = err
continue
}
var nRead, nWrite float64
parts := strings.Split(ioStats, " ")
if len(parts) < 2 {
log.Error("Bad syntax of ioStats", "ioStats", ioStats)
merr = fmt.Errorf("bad syntax of ioStats %s", ioStats)
continue
}
if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil {
log.Error("Bad syntax of read entry", "entry", parts[0])
merr = err
continue
}
if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil {
log.Error("Bad syntax of write entry", "entry", parts[1])
merr = err
continue
}
if db.diskReadMeter != nil {
db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
}
if db.diskWriteMeter != nil {
db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
}
iostats[0], iostats[1] = nRead, nWrite

// Sleep a bit, then repeat the stats collection
select {
case errc = <-db.quitChan:
// Quit requesting, stop hammering the database
case <-time.After(refresh):
// Timeout, gather a new set of stats
}
}

if errc == nil {
errc = <-db.quitChan
}
errc <- merr
}
6 changes: 3 additions & 3 deletions swarm/shed/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestDB_persistence(t *testing.T) {
}
defer os.RemoveAll(dir)

db, err := NewDB(dir)
db, err := NewDB(dir, "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -73,7 +73,7 @@ func TestDB_persistence(t *testing.T) {
t.Fatal(err)
}

db2, err := NewDB(dir)
db2, err := NewDB(dir, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -101,7 +101,7 @@ func newTestDB(t *testing.T) (db *DB, cleanupFunc func()) {
t.Fatal(err)
}
cleanupFunc = func() { os.RemoveAll(dir) }
db, err = NewDB(dir)
db, err = NewDB(dir, "")
if err != nil {
cleanupFunc()
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion swarm/shed/example_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Store struct {
// and possible conflicts with schema from existing database is checked
// automatically.
func New(path string) (s *Store, err error) {
db, err := shed.NewDB(path)
db, err := shed.NewDB(path, "")
if err != nil {
return nil, err
}
Expand Down