Skip to content

Commit

Permalink
Make X-chain mempool safe for concurrent use (#2520)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Dec 20, 2023
1 parent 9d19143 commit 0d0ac62
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 208 deletions.
4 changes: 3 additions & 1 deletion vms/avm/block/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,9 @@ func TestBlockBuilderAddLocalTx(t *testing.T) {
tx := transactions[0]
txID := tx.ID()
require.NoError(mempool.Add(tx))
require.True(mempool.Has(txID))

_, ok := mempool.Get(txID)
require.True(ok)

parser, err := block.NewParser([]fxs.Fx{
&secp256k1fx.Fx{},
Expand Down
2 changes: 1 addition & 1 deletion vms/avm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (n *network) IssueTx(ctx context.Context, tx *txs.Tx) error {
// returns nil if the tx is in the mempool
func (n *network) issueTx(tx *txs.Tx) error {
txID := tx.ID()
if n.mempool.Has(txID) {
if _, ok := n.mempool.Get(txID); ok {
// The tx is already in the mempool
return nil
}
Expand Down
22 changes: 12 additions & 10 deletions vms/avm/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func TestNetworkAppGossip(t *testing.T) {
},
},
{
// Issue returns nil because mempool has tx. We should gossip the tx.
// Issue returns nil because mempool has tx. We haven't gossipped
// the tx recently, so we should gossip it.
name: "issuance succeeds",
msgBytesFunc: func() []byte {
msg := message.Tx{
Expand All @@ -103,17 +104,18 @@ func TestNetworkAppGossip(t *testing.T) {
},
mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool {
mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Has(gomock.Any()).Return(true)
mempool.EXPECT().Get(gomock.Any()).Return(testTx, true)
return mempool
},
appSenderFunc: func(ctrl *gomock.Controller) common.AppSender {
appSender := common.NewMockSender(ctrl)
appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any())
appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Times(1)
return appSender
},
},
{
// Issue returns error because tx was dropped. We shouldn't gossip the tx.
// Issue returns error because tx was dropped. We shouldn't gossip
// the tx.
name: "issuance fails",
msgBytesFunc: func() []byte {
msg := message.Tx{
Expand All @@ -125,7 +127,7 @@ func TestNetworkAppGossip(t *testing.T) {
},
mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool {
mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Has(gomock.Any()).Return(false)
mempool.EXPECT().Get(gomock.Any()).Return(nil, false)
mempool.EXPECT().GetDropReason(gomock.Any()).Return(errTest)
return mempool
},
Expand Down Expand Up @@ -176,7 +178,7 @@ func TestNetworkIssueTx(t *testing.T) {
name: "mempool has transaction",
mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool {
mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Has(gomock.Any()).Return(true)
mempool.EXPECT().Get(gomock.Any()).Return(nil, true)
return mempool
},
managerFunc: func(ctrl *gomock.Controller) executor.Manager {
Expand All @@ -195,7 +197,7 @@ func TestNetworkIssueTx(t *testing.T) {
name: "transaction marked as dropped in mempool",
mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool {
mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Has(gomock.Any()).Return(false)
mempool.EXPECT().Get(gomock.Any()).Return(nil, false)
mempool.EXPECT().GetDropReason(gomock.Any()).Return(errTest)
return mempool
},
Expand All @@ -213,7 +215,7 @@ func TestNetworkIssueTx(t *testing.T) {
name: "transaction invalid",
mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool {
mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Has(gomock.Any()).Return(false)
mempool.EXPECT().Get(gomock.Any()).Return(nil, false)
mempool.EXPECT().GetDropReason(gomock.Any()).Return(nil)
mempool.EXPECT().MarkDropped(gomock.Any(), gomock.Any())
return mempool
Expand All @@ -233,7 +235,7 @@ func TestNetworkIssueTx(t *testing.T) {
name: "can't add transaction to mempool",
mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool {
mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Has(gomock.Any()).Return(false)
mempool.EXPECT().Get(gomock.Any()).Return(nil, false)
mempool.EXPECT().GetDropReason(gomock.Any()).Return(nil)
mempool.EXPECT().Add(gomock.Any()).Return(errTest)
mempool.EXPECT().MarkDropped(gomock.Any(), gomock.Any())
Expand All @@ -254,7 +256,7 @@ func TestNetworkIssueTx(t *testing.T) {
name: "happy path",
mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool {
mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Has(gomock.Any()).Return(false)
mempool.EXPECT().Get(gomock.Any()).Return(nil, false)
mempool.EXPECT().GetDropReason(gomock.Any()).Return(nil)
mempool.EXPECT().Add(gomock.Any()).Return(nil)
mempool.EXPECT().RequestBuildBlock()
Expand Down
116 changes: 66 additions & 50 deletions vms/avm/txs/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package mempool
import (
"errors"
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/linkedhashmap"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/units"
Expand All @@ -26,8 +28,6 @@ const (
// droppedTxIDsCacheSize is the maximum number of dropped txIDs to cache
droppedTxIDsCacheSize = 64

initialConsumedUTXOsSize = 512

// maxMempoolSize is the maximum number of bytes allowed in the mempool
maxMempoolSize = 64 * units.MiB
)
Expand All @@ -44,13 +44,16 @@ var (
// Mempool contains transactions that have not yet been put into a block.
type Mempool interface {
Add(tx *txs.Tx) error
Has(txID ids.ID) bool
Get(txID ids.ID) *txs.Tx
Get(txID ids.ID) (*txs.Tx, bool)
Remove(txs []*txs.Tx)

// Peek returns the oldest tx in the mempool.
Peek() (tx *txs.Tx, exists bool)

// Iterate over transactions from oldest to newest until the function
// returns false or there are no more transactions.
Iterate(f func(tx *txs.Tx) bool)

// RequestBuildBlock notifies the consensus engine that a block should be
// built if there is at least one transaction in the mempool.
RequestBuildBlock()
Expand All @@ -62,60 +65,55 @@ type Mempool interface {
}

type mempool struct {
bytesAvailableMetric prometheus.Gauge
bytesAvailable int

unissuedTxs linkedhashmap.LinkedHashmap[ids.ID, *txs.Tx]
numTxs prometheus.Gauge
lock sync.RWMutex
unissuedTxs linkedhashmap.LinkedHashmap[ids.ID, *txs.Tx]
consumedUTXOs set.Set[ids.ID]
bytesAvailable int
droppedTxIDs *cache.LRU[ids.ID, error] // TxID -> Verification error

toEngine chan<- common.Message

// Key: Tx ID
// Value: Verification error
droppedTxIDs *cache.LRU[ids.ID, error]

consumedUTXOs set.Set[ids.ID]
numTxs prometheus.Gauge
bytesAvailableMetric prometheus.Gauge
}

func New(
namespace string,
registerer prometheus.Registerer,
toEngine chan<- common.Message,
) (Mempool, error) {
bytesAvailableMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "bytes_available",
Help: "Number of bytes of space currently available in the mempool",
})
if err := registerer.Register(bytesAvailableMetric); err != nil {
return nil, err
m := &mempool{
unissuedTxs: linkedhashmap.New[ids.ID, *txs.Tx](),
bytesAvailable: maxMempoolSize,
droppedTxIDs: &cache.LRU[ids.ID, error]{Size: droppedTxIDsCacheSize},
toEngine: toEngine,
numTxs: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "count",
Help: "Number of transactions in the mempool",
}),
bytesAvailableMetric: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "bytes_available",
Help: "Number of bytes of space currently available in the mempool",
}),
}
m.bytesAvailableMetric.Set(maxMempoolSize)

numTxsMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "count",
Help: "Number of transactions in the mempool",
})
if err := registerer.Register(numTxsMetric); err != nil {
return nil, err
}

bytesAvailableMetric.Set(maxMempoolSize)
return &mempool{
bytesAvailableMetric: bytesAvailableMetric,
bytesAvailable: maxMempoolSize,
unissuedTxs: linkedhashmap.New[ids.ID, *txs.Tx](),
numTxs: numTxsMetric,
toEngine: toEngine,
droppedTxIDs: &cache.LRU[ids.ID, error]{Size: droppedTxIDsCacheSize},
consumedUTXOs: set.NewSet[ids.ID](initialConsumedUTXOsSize),
}, nil
err := utils.Err(
registerer.Register(m.numTxs),
registerer.Register(m.bytesAvailableMetric),
)
return m, err
}

func (m *mempool) Add(tx *txs.Tx) error {
// Note: a previously dropped tx can be re-added
txID := tx.ID()
if m.Has(txID) {

m.lock.Lock()
defer m.lock.Unlock()

if _, ok := m.unissuedTxs.Get(txID); ok {
return fmt.Errorf("%w: %s", errDuplicateTx, txID)
}

Expand Down Expand Up @@ -151,21 +149,20 @@ func (m *mempool) Add(tx *txs.Tx) error {
// Mark these UTXOs as consumed in the mempool
m.consumedUTXOs.Union(inputs)

// An explicitly added tx must not be marked as dropped.
// An added tx must not be marked as dropped.
m.droppedTxIDs.Evict(txID)
return nil
}

func (m *mempool) Has(txID ids.ID) bool {
return m.Get(txID) != nil
}

func (m *mempool) Get(txID ids.ID) *txs.Tx {
tx, _ := m.unissuedTxs.Get(txID)
return tx
func (m *mempool) Get(txID ids.ID) (*txs.Tx, bool) {
tx, ok := m.unissuedTxs.Get(txID)
return tx, ok
}

func (m *mempool) Remove(txsToRemove []*txs.Tx) {
m.lock.Lock()
defer m.lock.Unlock()

for _, tx := range txsToRemove {
txID := tx.ID()
if !m.unissuedTxs.Delete(txID) {
Expand All @@ -187,6 +184,18 @@ func (m *mempool) Peek() (*txs.Tx, bool) {
return tx, exists
}

func (m *mempool) Iterate(f func(*txs.Tx) bool) {
m.lock.RLock()
defer m.lock.RUnlock()

it := m.unissuedTxs.NewIterator()
for it.Next() {
if !f(it.Value()) {
return
}
}
}

func (m *mempool) RequestBuildBlock() {
if m.unissuedTxs.Len() == 0 {
return
Expand All @@ -199,6 +208,13 @@ func (m *mempool) RequestBuildBlock() {
}

func (m *mempool) MarkDropped(txID ids.ID, reason error) {
m.lock.RLock()
defer m.lock.RUnlock()

if _, ok := m.unissuedTxs.Get(txID); ok {
return
}

m.droppedTxIDs.Put(txID, reason)
}

Expand Down
Loading

0 comments on commit 0d0ac62

Please sign in to comment.