Skip to content

Commit

Permalink
core, metrics: switch some invalid counters to gauges (ethereum#20047)
Browse files Browse the repository at this point in the history
  • Loading branch information
gzliudan committed May 10, 2024
1 parent 67825d8 commit 742a7f9
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 20 deletions.
40 changes: 20 additions & 20 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ var (
invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)

pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil)
queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil)
localCounter = metrics.NewRegisteredCounter("txpool/local", nil)
pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
)

// TxStatus is the current status of a transaction as seen by the pool.
Expand Down Expand Up @@ -730,7 +730,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
}
}
if local || pool.locals.contains(from) {
localCounter.Inc(1)
localGauge.Inc(1)
}
pool.journalTx(from, tx)

Expand Down Expand Up @@ -760,7 +760,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
queuedReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the queued counter
queuedCounter.Inc(1)
queuedGauge.Inc(1)
}
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
Expand Down Expand Up @@ -809,7 +809,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
pendingReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the pending counter
pendingCounter.Inc(1)
pendingGauge.Inc(1)
}
// Failsafe to work around direct pending inserts (tests)
if pool.all.Get(hash) == nil {
Expand Down Expand Up @@ -840,7 +840,7 @@ func (pool *TxPool) promoteSpecialTx(addr common.Address, tx *types.Transaction)
pendingReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the pending counter
pendingCounter.Inc(1)
pendingGauge.Inc(1)
}
list.txs.Put(tx)
if cost := tx.Cost(); list.costcap.Cmp(cost) < 0 {
Expand Down Expand Up @@ -980,7 +980,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
pool.priced.Removed(1)
}
if pool.locals.contains(addr) {
localCounter.Dec(1)
localGauge.Dec(1)
}
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
Expand All @@ -997,15 +997,15 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// Update the account nonce if needed
pool.pendingNonces.setIfLower(addr, tx.Nonce())
// Reduce the pending counter
pendingCounter.Dec(int64(1 + len(invalids)))
pendingGauge.Dec(int64(1 + len(invalids)))
return
}
}
// Transaction is in the future queue
if future := pool.queue[addr]; future != nil {
if removed, _ := future.Remove(tx); removed {
// Reduce the queued counter
queuedCounter.Dec(1)
queuedGauge.Dec(1)
}
if future.Empty() {
delete(pool.queue, addr)
Expand Down Expand Up @@ -1313,7 +1313,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
promoted = append(promoted, tx)
}
}
queuedCounter.Dec(int64(len(readies)))
queuedGauge.Dec(int64(len(readies)))

// Drop all transactions over the allowed limit
var caps types.Transactions
Expand All @@ -1328,9 +1328,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
}
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
if pool.locals.contains(addr) {
localCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
Expand Down Expand Up @@ -1389,9 +1389,9 @@ func (pool *TxPool) truncatePending() {
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pool.priced.Removed(len(caps))
pendingCounter.Dec(int64(len(caps)))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(offenders[i]) {
localCounter.Dec(int64(len(caps)))
localGauge.Dec(int64(len(caps)))
}
pending--
}
Expand All @@ -1416,9 +1416,9 @@ func (pool *TxPool) truncatePending() {
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pool.priced.Removed(len(caps))
pendingCounter.Dec(int64(len(caps)))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(addr) {
localCounter.Dec(int64(len(caps)))
localGauge.Dec(int64(len(caps)))
}
pending--
}
Expand Down Expand Up @@ -1506,9 +1506,9 @@ func (pool *TxPool) demoteUnexecutables() {
log.Trace("Demoting pending transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
if pool.locals.contains(addr) {
localCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
}
// If there's a gap in front, alert (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil {
Expand All @@ -1518,7 +1518,7 @@ func (pool *TxPool) demoteUnexecutables() {
log.Warn("Demoting invalidated transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
pendingCounter.Dec(int64(len(gapped)))
pendingGauge.Dec(int64(len(gapped)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
Expand Down
38 changes: 38 additions & 0 deletions metrics/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import "sync/atomic"
type Gauge interface {
Snapshot() Gauge
Update(int64)
Dec(int64)
Inc(int64)
Value() int64
}

Expand Down Expand Up @@ -65,6 +67,16 @@ func (GaugeSnapshot) Update(int64) {
panic("Update called on a GaugeSnapshot")
}

// Dec panics.
func (GaugeSnapshot) Dec(int64) {
panic("Dec called on a GaugeSnapshot")
}

// Inc panics.
func (GaugeSnapshot) Inc(int64) {
panic("Inc called on a GaugeSnapshot")
}

// Value returns the value at the time the snapshot was taken.
func (g GaugeSnapshot) Value() int64 { return int64(g) }

Expand All @@ -77,6 +89,12 @@ func (NilGauge) Snapshot() Gauge { return NilGauge{} }
// Update is a no-op.
func (NilGauge) Update(v int64) {}

// Dec is a no-op.
func (NilGauge) Dec(i int64) {}

// Inc is a no-op.
func (NilGauge) Inc(i int64) {}

// Value is a no-op.
func (NilGauge) Value() int64 { return 0 }

Expand All @@ -101,6 +119,16 @@ func (g *StandardGauge) Value() int64 {
return atomic.LoadInt64(&g.value)
}

// Dec decrements the gauge's current value by the given amount.
func (g *StandardGauge) Dec(i int64) {
atomic.AddInt64(&g.value, -i)
}

// Inc increments the gauge's current value by the given amount.
func (g *StandardGauge) Inc(i int64) {
atomic.AddInt64(&g.value, i)
}

// FunctionalGauge returns value from given function
type FunctionalGauge struct {
value func() int64
Expand All @@ -118,3 +146,13 @@ func (g FunctionalGauge) Snapshot() Gauge { return GaugeSnapshot(g.Value()) }
func (FunctionalGauge) Update(int64) {
panic("Update called on a FunctionalGauge")
}

// Dec panics.
func (FunctionalGauge) Dec(int64) {
panic("Dec called on a FunctionalGauge")
}

// Inc panics.
func (FunctionalGauge) Inc(int64) {
panic("Inc called on a FunctionalGauge")
}

0 comments on commit 742a7f9

Please sign in to comment.