Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/shed: add metrics to each shed db #18277

Merged
merged 4 commits into from
Dec 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 202 additions & 3 deletions swarm/shed/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,46 @@
package shed

import (
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
)

// The limit for LevelDB OpenFilesCacheCapacity.
const openFileLimit = 128
const (
openFileLimit = 128 // The limit for LevelDB OpenFilesCacheCapacity.
writePauseWarningThrottler = 1 * time.Minute
)

// DB provides abstractions over LevelDB in order to
// implement complex structures using fields and ordered indexes.
// It provides a schema functionality to store fields and indexes
// information about naming and types.
type DB struct {
ldb *leveldb.DB

compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written

quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
}

// NewDB constructs a new DB and validates the schema
// if it exists in database on the given path.
func NewDB(path string) (db *DB, err error) {
// metricsPrefix is used for metrics collection for the given DB.
func NewDB(path string, metricsPrefix string) (db *DB, err error) {
ldb, err := leveldb.OpenFile(path, &opt.Options{
OpenFilesCacheCapacity: openFileLimit,
})
Expand All @@ -66,6 +86,15 @@ func NewDB(path string) (db *DB, err error) {
return nil, err
}
}

// Configure meters for DB
db.configure(metricsPrefix)

// Create a quit channel for the periodic metrics collector and run it
db.quitChan = make(chan chan error)

go db.meter(10 * time.Second)

return db, nil
}

Expand Down Expand Up @@ -126,5 +155,175 @@ func (db *DB) WriteBatch(batch *leveldb.Batch) (err error) {

// Close closes LevelDB database.
func (db *DB) Close() (err error) {
close(db.quitChan)
return db.ldb.Close()
}

// Configure configures the database metrics collectors
func (db *DB) configure(prefix string) {
// Initialize all the metrics collector at the requested prefix
db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
}

func (db *DB) meter(refresh time.Duration) {
// Create the counters to store current and previous compaction values
compactions := make([][]float64, 2)
for i := 0; i < 2; i++ {
compactions[i] = make([]float64, 3)
}
// Create storage for iostats.
var iostats [2]float64

// Create storage and warning log tracer for write delay.
var (
delaystats [2]int64
lastWritePaused time.Time
)

var (
errc chan error
merr error
)

// Iterate ad infinitum and collect the stats
for i := 1; errc == nil && merr == nil; i++ {
// Retrieve the database stats
stats, err := db.ldb.GetProperty("leveldb.stats")
if err != nil {
log.Error("Failed to read database stats", "err", err)
merr = err
continue
}
// Find the compaction table, skip the header
lines := strings.Split(stats, "\n")
for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
lines = lines[1:]
}
if len(lines) <= 3 {
log.Error("Compaction table not found")
merr = errors.New("compaction table not found")
continue
}
lines = lines[3:]

// Iterate over all the table rows, and accumulate the entries
for j := 0; j < len(compactions[i%2]); j++ {
compactions[i%2][j] = 0
}
for _, line := range lines {
parts := strings.Split(line, "|")
if len(parts) != 6 {
break
}
for idx, counter := range parts[3:] {
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
if err != nil {
log.Error("Compaction entry parsing failed", "err", err)
merr = err
continue
}
compactions[i%2][idx] += value
}
}
// Update all the requested meters
if db.compTimeMeter != nil {
db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
}
if db.compReadMeter != nil {
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
}
if db.compWriteMeter != nil {
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
}

// Retrieve the write delay statistic
writedelay, err := db.ldb.GetProperty("leveldb.writedelay")
if err != nil {
log.Error("Failed to read database write delay statistic", "err", err)
merr = err
continue
}
var (
delayN int64
delayDuration string
duration time.Duration
paused bool
)
if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil {
log.Error("Write delay statistic not found")
merr = err
continue
}
duration, err = time.ParseDuration(delayDuration)
if err != nil {
log.Error("Failed to parse delay duration", "err", err)
merr = err
continue
}
if db.writeDelayNMeter != nil {
db.writeDelayNMeter.Mark(delayN - delaystats[0])
}
if db.writeDelayMeter != nil {
db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
}
// If a warning that db is performing compaction has been displayed, any subsequent
// warnings will be withheld for one minute not to overwhelm the user.
if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 &&
time.Now().After(lastWritePaused.Add(writePauseWarningThrottler)) {
log.Warn("Database compacting, degraded performance")
lastWritePaused = time.Now()
}
delaystats[0], delaystats[1] = delayN, duration.Nanoseconds()

// Retrieve the database iostats.
ioStats, err := db.ldb.GetProperty("leveldb.iostats")
if err != nil {
log.Error("Failed to read database iostats", "err", err)
merr = err
continue
}
var nRead, nWrite float64
parts := strings.Split(ioStats, " ")
if len(parts) < 2 {
log.Error("Bad syntax of ioStats", "ioStats", ioStats)
merr = fmt.Errorf("bad syntax of ioStats %s", ioStats)
continue
}
if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil {
log.Error("Bad syntax of read entry", "entry", parts[0])
merr = err
continue
}
if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil {
log.Error("Bad syntax of write entry", "entry", parts[1])
merr = err
continue
}
if db.diskReadMeter != nil {
db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
}
if db.diskWriteMeter != nil {
db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
}
iostats[0], iostats[1] = nRead, nWrite

// Sleep a bit, then repeat the stats collection
select {
case errc = <-db.quitChan:
// Quit requesting, stop hammering the database
case <-time.After(refresh):
// Timeout, gather a new set of stats
}
}

if errc == nil {
errc = <-db.quitChan
}
errc <- merr
}
6 changes: 3 additions & 3 deletions swarm/shed/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestDB_persistence(t *testing.T) {
}
defer os.RemoveAll(dir)

db, err := NewDB(dir)
db, err := NewDB(dir, "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -73,7 +73,7 @@ func TestDB_persistence(t *testing.T) {
t.Fatal(err)
}

db2, err := NewDB(dir)
db2, err := NewDB(dir, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -101,7 +101,7 @@ func newTestDB(t *testing.T) (db *DB, cleanupFunc func()) {
t.Fatal(err)
}
cleanupFunc = func() { os.RemoveAll(dir) }
db, err = NewDB(dir)
db, err = NewDB(dir, "")
if err != nil {
cleanupFunc()
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion swarm/shed/example_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Store struct {
// and possible conflicts with schema from existing database is checked
// automatically.
func New(path string) (s *Store, err error) {
db, err := shed.NewDB(path)
db, err := shed.NewDB(path, "")
if err != nil {
return nil, err
}
Expand Down