-
Notifications
You must be signed in to change notification settings - Fork 110
swarm/shed: add metrics to each shed db #1029
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,26 +23,48 @@ | |
package shed | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/metrics" | ||
"github.com/ethereum/go-ethereum/swarm/log" | ||
"github.com/syndtr/goleveldb/leveldb" | ||
"github.com/syndtr/goleveldb/leveldb/iterator" | ||
"github.com/syndtr/goleveldb/leveldb/opt" | ||
) | ||
|
||
// The limit for LevelDB OpenFilesCacheCapacity. | ||
const openFileLimit = 128 | ||
const ( | ||
openFileLimit = 128 // The limit for LevelDB OpenFilesCacheCapacity. | ||
writePauseWarningThrottler = 1 * time.Minute | ||
) | ||
|
||
// DB provides abstractions over LevelDB in order to | ||
// implement complex structures using fields and ordered indexes. | ||
// It provides a schema functionality to store fields and indexes | ||
// information about naming and types. | ||
type DB struct { | ||
ldb *leveldb.DB | ||
|
||
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction | ||
compReadMeter metrics.Meter // Meter for measuring the data read during compaction | ||
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction | ||
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction | ||
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction | ||
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read | ||
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written | ||
|
||
quitLock sync.Mutex // Mutex protecting the quit channel access | ||
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database | ||
} | ||
|
||
// NewDB constructs a new DB and validates the schema | ||
// if it exists in database on the given path. | ||
func NewDB(path string) (db *DB, err error) { | ||
// prefix is used for metrics collection for the given DB. | ||
func NewDB(path string, prefix string) (db *DB, err error) { | ||
ldb, err := leveldb.OpenFile(path, &opt.Options{ | ||
OpenFilesCacheCapacity: openFileLimit, | ||
}) | ||
|
@@ -66,6 +88,9 @@ func NewDB(path string) (db *DB, err error) { | |
return nil, err | ||
} | ||
} | ||
|
||
db.Meter(prefix) | ||
|
||
return db, nil | ||
} | ||
|
||
|
@@ -128,3 +153,179 @@ func (db *DB) WriteBatch(batch *leveldb.Batch) (err error) { | |
func (db *DB) Close() (err error) { | ||
return db.ldb.Close() | ||
} | ||
|
||
// Meter configures the database metrics collectors | ||
func (db *DB) Meter(prefix string) { | ||
// Initialize all the metrics collector at the requested prefix | ||
db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't *Meter fields assignments be under a lock? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. Why would they be under a lock? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As Meter is exported, it can be called any number of times, this is why I suggested not to export it, in the commend bellow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, |
||
db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil) | ||
db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil) | ||
db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil) | ||
db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil) | ||
db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil) | ||
db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil) | ||
|
||
// Create a quit channel for the periodic collector and run it | ||
db.quitLock.Lock() | ||
db.quitChan = make(chan chan error) | ||
db.quitLock.Unlock() | ||
|
||
go db.meter(10 * time.Second) | ||
} | ||
|
||
func (db *DB) meter(refresh time.Duration) { | ||
// Create the counters to store current and previous compaction values | ||
compactions := make([][]float64, 2) | ||
for i := 0; i < 2; i++ { | ||
compactions[i] = make([]float64, 3) | ||
} | ||
// Create storage for iostats. | ||
var iostats [2]float64 | ||
|
||
// Create storage and warning log tracer for write delay. | ||
var ( | ||
delaystats [2]int64 | ||
lastWritePaused time.Time | ||
) | ||
|
||
var ( | ||
errc chan error | ||
merr error | ||
) | ||
|
||
// Iterate ad infinitum and collect the stats | ||
for i := 1; errc == nil && merr == nil; i++ { | ||
// Retrieve the database stats | ||
stats, err := db.ldb.GetProperty("leveldb.stats") | ||
if err != nil { | ||
log.Error("Failed to read database stats", "err", err) | ||
merr = err | ||
continue | ||
} | ||
// Find the compaction table, skip the header | ||
lines := strings.Split(stats, "\n") | ||
for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" { | ||
lines = lines[1:] | ||
} | ||
if len(lines) <= 3 { | ||
log.Error("Compaction table not found") | ||
merr = errors.New("compaction table not found") | ||
continue | ||
} | ||
lines = lines[3:] | ||
|
||
// Iterate over all the table rows, and accumulate the entries | ||
for j := 0; j < len(compactions[i%2]); j++ { | ||
compactions[i%2][j] = 0 | ||
} | ||
for _, line := range lines { | ||
parts := strings.Split(line, "|") | ||
if len(parts) != 6 { | ||
break | ||
} | ||
for idx, counter := range parts[3:] { | ||
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64) | ||
if err != nil { | ||
log.Error("Compaction entry parsing failed", "err", err) | ||
merr = err | ||
continue | ||
} | ||
compactions[i%2][idx] += value | ||
} | ||
} | ||
// Update all the requested meters | ||
if db.compTimeMeter != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All *Meter fields checks should be under a lock. I have a suggestion, not to export Meter method, just to initialize metrics in NewDB, making them "immutable" from "outside" and to remove the lock completely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the problem is that |
||
db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000)) | ||
} | ||
if db.compReadMeter != nil { | ||
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024)) | ||
} | ||
if db.compWriteMeter != nil { | ||
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) | ||
} | ||
|
||
// Retrieve the write delay statistic | ||
writedelay, err := db.ldb.GetProperty("leveldb.writedelay") | ||
if err != nil { | ||
log.Error("Failed to read database write delay statistic", "err", err) | ||
merr = err | ||
continue | ||
} | ||
var ( | ||
delayN int64 | ||
delayDuration string | ||
duration time.Duration | ||
paused bool | ||
) | ||
if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil { | ||
log.Error("Write delay statistic not found") | ||
merr = err | ||
continue | ||
} | ||
duration, err = time.ParseDuration(delayDuration) | ||
if err != nil { | ||
log.Error("Failed to parse delay duration", "err", err) | ||
merr = err | ||
continue | ||
} | ||
if db.writeDelayNMeter != nil { | ||
db.writeDelayNMeter.Mark(delayN - delaystats[0]) | ||
} | ||
if db.writeDelayMeter != nil { | ||
db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1]) | ||
} | ||
// If a warning that db is performing compaction has been displayed, any subsequent | ||
// warnings will be withheld for one minute not to overwhelm the user. | ||
if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 && | ||
time.Now().After(lastWritePaused.Add(writePauseWarningThrottler)) { | ||
log.Warn("Database compacting, degraded performance") | ||
lastWritePaused = time.Now() | ||
} | ||
delaystats[0], delaystats[1] = delayN, duration.Nanoseconds() | ||
|
||
// Retrieve the database iostats. | ||
ioStats, err := db.ldb.GetProperty("leveldb.iostats") | ||
if err != nil { | ||
log.Error("Failed to read database iostats", "err", err) | ||
merr = err | ||
continue | ||
} | ||
var nRead, nWrite float64 | ||
parts := strings.Split(ioStats, " ") | ||
if len(parts) < 2 { | ||
log.Error("Bad syntax of ioStats", "ioStats", ioStats) | ||
merr = fmt.Errorf("bad syntax of ioStats %s", ioStats) | ||
continue | ||
} | ||
if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil { | ||
log.Error("Bad syntax of read entry", "entry", parts[0]) | ||
merr = err | ||
continue | ||
} | ||
if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil { | ||
log.Error("Bad syntax of write entry", "entry", parts[1]) | ||
merr = err | ||
continue | ||
} | ||
if db.diskReadMeter != nil { | ||
db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024)) | ||
} | ||
if db.diskWriteMeter != nil { | ||
db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024)) | ||
} | ||
iostats[0], iostats[1] = nRead, nWrite | ||
|
||
// Sleep a bit, then repeat the stats collection | ||
select { | ||
case errc = <-db.quitChan: | ||
// Quit requesting, stop hammering the database | ||
case <-time.After(refresh): | ||
// Timeout, gather a new set of stats | ||
} | ||
} | ||
|
||
if errc == nil { | ||
errc = <-db.quitChan | ||
} | ||
errc <- merr | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just rename
prefix
tometricsPrefix
as it is more clear what is it for.It is also ok to have metrics optional. Not to call db.Meter in NewBD, but to explicitly call it only in implementation when it is needed. In that case prefix argument is not needed in NewDB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is no harm calling it for every single LevelDB, as metrics are gathered only once every 10sec. and AFAIK LevelDB also collects them always.