Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Swap metrics reporter #1004

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions p2p/protocols/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,33 @@

package protocols

import "github.com/ethereum/go-ethereum/metrics"
import (
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/syndtr/goleveldb/leveldb"
)

//define some metrics
var (
//NOTE: these metrics just define the interfaces and are currently *NOT persisted* over sessions
//All metrics are cumulative

//total amount of units credited
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", nil)
mBalanceCredit metrics.Counter
//total amount of units debited
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", nil)
mBalanceDebit metrics.Counter
//total amount of bytes credited
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", nil)
mBytesCredit metrics.Counter
//total amount of bytes debited
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", nil)
mBytesDebit metrics.Counter
//total amount of credited messages
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", nil)
mMsgCredit metrics.Counter
//total amount of debited messages
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", nil)
mMsgDebit metrics.Counter
//how many times local node had to drop remote peers
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", nil)
mPeerDrops metrics.Counter
//how many times local node overdrafted and dropped
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", nil)
mSelfDrops metrics.Counter
)

//Prices defines how prices are being passed on to the accounting instance
Expand Down Expand Up @@ -105,6 +109,26 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
return ah
}

//SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
//this registry should be independent of any other metrics as it persists at different endpoints.
//It also instantiates the given metrics and starts the persisting go-routine which
//at the passed interval writes the metrics to a LevelDB
func SetupAccountingMetrics(reportInterval time.Duration, path string) *leveldb.DB {
//create an empty registry
registry := metrics.NewRegistry()
//instantiate the metrics
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry)
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry)
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry)
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry)
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry)
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry)
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry)
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry)
//create the DB and start persisting
return NewMetricsDB(registry, reportInterval, path)
}

//Implement Hook.Send
// Send takes a peer, a size and a msg and
// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
Expand Down
134 changes: 134 additions & 0 deletions p2p/protocols/reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package protocols

import (
"encoding/binary"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"

"github.com/syndtr/goleveldb/leveldb"
)

//reporter is an internal structure used to write p2p accounting related
//metrics to a LevelDB. It will periodically write the accrued metrics to the DB.
type reporter struct {
reg metrics.Registry //the registry for these metrics (independent of other metrics)
interval time.Duration //duration at which the reporter will persist metrics
db *leveldb.DB //the actual DB
}

//NewMetricsDB creates a new LevelDB instance used to persist metrics defined
//inside p2p/protocols/accounting.go
func NewMetricsDB(r metrics.Registry, d time.Duration, path string) *leveldb.DB {
holisticode marked this conversation as resolved.
Show resolved Hide resolved
var val = make([]byte, 8)
var err error

//Create the LevelDB
db, err := leveldb.OpenFile(path, nil)
if err != nil {
log.Error(err.Error())
return nil
}

//Check for all defined metrics that there is a value in the DB
//If there is, assign it to the metric. This means that the node
//has been running before and that metrics have been persisted.
val, err = db.Get([]byte("account.balance.credit"), nil)
holisticode marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
mBalanceCredit.Inc(int64(binary.BigEndian.Uint64(val)))
}
val, err = db.Get([]byte("account.balance.debit"), nil)
if err == nil {
mBalanceDebit.Inc(int64(binary.BigEndian.Uint64(val)))
}
val, err = db.Get([]byte("account.bytes.credit"), nil)
if err == nil {
mBytesCredit.Inc(int64(binary.BigEndian.Uint64(val)))
}
val, err = db.Get([]byte("account.bytes.debit"), nil)
if err == nil {
mBytesDebit.Inc(int64(binary.BigEndian.Uint64(val)))
}
val, err = db.Get([]byte("account.msg.credit"), nil)
if err == nil {
mMsgCredit.Inc(int64(binary.BigEndian.Uint64(val)))
}
val, err = db.Get([]byte("account.msg.debit"), nil)
if err == nil {
mMsgDebit.Inc(int64(binary.BigEndian.Uint64(val)))
}
val, err = db.Get([]byte("account.peerdrops"), nil)
if err == nil {
mPeerDrops.Inc(int64(binary.BigEndian.Uint64(val)))
}
val, err = db.Get([]byte("account.selfdrops"), nil)
if err == nil {
mSelfDrops.Inc(int64(binary.BigEndian.Uint64(val)))
}

//create the reporter
reg := &reporter{
reg: r,
interval: d,
db: db,
}

//run the go routine
go reg.run()

return db

}

//run is the go routine which periodically sends the metrics to the configued LevelDB
func (r *reporter) run() {
intervalTicker := time.NewTicker(r.interval)

for _ = range intervalTicker.C {
//at each tick send the metrics
if err := r.send(); err != nil {
log.Error("unable to send metrics to LevelDB. err=%v", "err", err)
holisticode marked this conversation as resolved.
Show resolved Hide resolved
//If there is an error in writing, exit the routine; we assume here that the error is
//severe and don't attempt to write again.
//Also, this should prevent leaking when the node is stopped
holisticode marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}

//send the metrics to the DB
func (r *reporter) send() error {
var err error
//for each metric in the registry (which is independent)...
r.reg.Each(func(name string, i interface{}) {
switch metric := i.(type) {
//assuming every metric here to be a Counter (separate registry)
case metrics.Counter:
//...create a snapshot...
ms := metric.Snapshot()
byteVal := make([]byte, 8)
binary.BigEndian.PutUint64(byteVal, uint64(ms.Count()))
//...and save the value to the DB
err = r.db.Put([]byte(name), byteVal, nil)
default:
}
})
return err
}
76 changes: 76 additions & 0 deletions p2p/protocols/reporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package protocols

import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/ethereum/go-ethereum/log"
)

//TestReporter tests that the metrics being collected for p2p accounting
//are being persisted and available after restart of a node.
//It simulates restarting by just recreating the DB as if the node had restarted.
func TestReporter(t *testing.T) {
//create a test directory
dir, err := ioutil.TempDir("", "reporter-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)

//setup the metrics
log.Debug("Setting up metrics first time")
reportInterval := 100 * time.Millisecond
db := SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
log.Debug("Done.")

//do some metrics
mBalanceCredit.Inc(12)
mBytesCredit.Inc(34)
mMsgDebit.Inc(9)

//give the reporter time to write the metrics to DB
time.Sleep(500 * time.Millisecond)
holisticode marked this conversation as resolved.
Show resolved Hide resolved

//set the metrics to nil - this effectively simulates the node having shut down...
mBalanceCredit = nil
mBytesCredit = nil
mMsgDebit = nil
//close the DB also, or we can't create a new one
db.Close()

//setup the metrics again
log.Debug("Setting up metrics second time")
SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
log.Debug("Done.")

//now check the metrics, they should have the same value as before "shutdown"
if mBalanceCredit.Count() != 12 {
t.Fatalf("Expected counter to be %d, but is %d", 12, mBalanceCredit.Count())
}
if mBytesCredit.Count() != 34 {
t.Fatalf("Expected counter to be %d, but is %d", 23, mBytesCredit.Count())
}
if mMsgDebit.Count() != 9 {
t.Fatalf("Expected counter to be %d, but is %d", 9, mMsgDebit.Count())
}
}
5 changes: 5 additions & 0 deletions swarm/swap/swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,8 @@ func (s *Swap) loadState(peer *protocols.Peer) (err error) {
}
return
}

//Clean up Swap
func (swap *Swap) Close() {
swap.stateStore.Close()
}
43 changes: 28 additions & 15 deletions swarm/swarm.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016 The go-ethereum Authors
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
Expand Down Expand Up @@ -53,6 +53,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/ethereum/go-ethereum/swarm/swap"
"github.com/ethereum/go-ethereum/swarm/tracing"
"github.com/syndtr/goleveldb/leveldb"
)

var (
Expand All @@ -66,20 +67,22 @@ var (

// the swarm stack
type Swarm struct {
config *api.Config // swarm configuration
api *api.API // high level api layer (fs/manifest)
dns api.Resolver // DNS registrar
fileStore *storage.FileStore // distributed preimage archive, the local API to the storage with document level storage/retrieval support
streamer *stream.Registry
bzz *network.Bzz // the logistic manager
backend chequebook.Backend // simple blockchain Backend
privateKey *ecdsa.PrivateKey
corsString string
swapEnabled bool
netStore *storage.NetStore
sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit
ps *pss.Pss
swap *swap.Swap
config *api.Config // swarm configuration
api *api.API // high level api layer (fs/manifest)
dns api.Resolver // DNS registrar
fileStore *storage.FileStore // distributed preimage archive, the local API to the storage with document level storage/retrieval support
streamer *stream.Registry
bzz *network.Bzz // the logistic manager
backend chequebook.Backend // simple blockchain Backend
privateKey *ecdsa.PrivateKey
corsString string
swapEnabled bool
netStore *storage.NetStore
sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit
ps *pss.Pss
swap *swap.Swap
stateStore *state.DBStore
metricsStore *leveldb.DB
holisticode marked this conversation as resolved.
Show resolved Hide resolved

tracerClose io.Closer
}
Expand Down Expand Up @@ -179,6 +182,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
return nil, err
}
self.swap = swap.New(balancesStore)
self.metricsStore = protocols.SetupAccountingMetrics(10*time.Second, filepath.Join(config.Path, "metrics.db"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think SetupAccountingMetrics is doing too many things - creating/loading the db into the registry and also firing up the reporter (currently set to every 10 seconds.). We should probably think about splitting this up, but it also works as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually NewAccountingMetrics that is doing both things, so I vote for keeping SetupAccountingMetrics as is

}

var nodeID enode.ID
Expand Down Expand Up @@ -446,6 +450,15 @@ func (self *Swarm) Stop() error {
ch.Stop()
ch.Save()
}
if self.swap != nil {
self.swap.Close()
}
if self.metricsStore != nil {
holisticode marked this conversation as resolved.
Show resolved Hide resolved
self.metricsStore.Close()
}
if self.stateStore != nil {
self.stateStore.Close()
}

if self.netStore != nil {
self.netStore.Close()
Expand Down