Skip to content

Commit

Permalink
Move locking into issueTx (#2532)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Dec 21, 2023
1 parent 9a63776 commit 1f8f10f
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 71 deletions.
81 changes: 52 additions & 29 deletions vms/avm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package network

import (
"context"
"fmt"
"sync"

"go.uber.org/zap"
Expand All @@ -28,11 +29,25 @@ var _ Network = (*network)(nil)
type Network interface {
common.AppHandler

// IssueTx verifies the transaction at the currently preferred state, adds
// it to the mempool, and gossips it to the network.
// IssueTx attempts to add a tx to the mempool, after verifying it against
// the preferred state. If the tx is added to the mempool, it will attempt
// to push gossip the tx to random peers in the network.
//
// Invariant: Assumes the context lock is held.
// If the tx is already in the mempool, mempool.ErrDuplicateTx will be
// returned.
// If the tx is not added to the mempool, an error will be returned.
//
// Invariant: The context lock is not held
IssueTx(context.Context, *txs.Tx) error

// IssueVerifiedTx attempts to add a tx to the mempool. If the tx is added
// to the mempool, it will attempt to push gossip the tx to random peers in
// the network.
//
// If the tx is already in the mempool, mempool.ErrDuplicateTx will be
// returned.
// If the tx is not added to the mempool, an error will be returned.
IssueVerifiedTx(context.Context, *txs.Tx) error
}

type network struct {
Expand Down Expand Up @@ -103,18 +118,10 @@ func (n *network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []b
)
return nil
}
txID := tx.ID()

// We need to grab the context lock here to avoid racy behavior with
// transaction verification + mempool modifications.
//
// Invariant: tx should not be referenced again without the context lock
// held to avoid any data races.
n.ctx.Lock.Lock()
err = n.issueTx(tx)
n.ctx.Lock.Unlock()
if err == nil {
n.gossipTx(ctx, txID, msgBytes)
if err := n.issueTx(tx); err == nil {
txID := tx.ID()
n.gossipTxMessage(ctx, txID, msgBytes)
}
return nil
}
Expand All @@ -123,27 +130,20 @@ func (n *network) IssueTx(ctx context.Context, tx *txs.Tx) error {
if err := n.issueTx(tx); err != nil {
return err
}
return n.gossipTx(ctx, tx)
}

txBytes := tx.Bytes()
msg := &message.Tx{
Tx: txBytes,
}
msgBytes, err := message.Build(msg)
if err != nil {
func (n *network) IssueVerifiedTx(ctx context.Context, tx *txs.Tx) error {
if err := n.issueVerifiedTx(tx); err != nil {
return err
}

txID := tx.ID()
n.gossipTx(ctx, txID, msgBytes)
return nil
return n.gossipTx(ctx, tx)
}

// returns nil if the tx is in the mempool
func (n *network) issueTx(tx *txs.Tx) error {
txID := tx.ID()
if _, ok := n.mempool.Get(txID); ok {
// The tx is already in the mempool
return nil
return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID)
}

if reason := n.mempool.GetDropReason(txID); reason != nil {
Expand All @@ -155,7 +155,10 @@ func (n *network) issueTx(tx *txs.Tx) error {
}

// Verify the tx at the currently preferred state
if err := n.manager.VerifyTx(tx); err != nil {
n.ctx.Lock.Lock()
err := n.manager.VerifyTx(tx)
n.ctx.Lock.Unlock()
if err != nil {
n.ctx.Log.Debug("tx failed verification",
zap.Stringer("txID", txID),
zap.Error(err),
Expand All @@ -165,7 +168,12 @@ func (n *network) issueTx(tx *txs.Tx) error {
return err
}

return n.issueVerifiedTx(tx)
}

func (n *network) issueVerifiedTx(tx *txs.Tx) error {
if err := n.mempool.Add(tx); err != nil {
txID := tx.ID()
n.ctx.Log.Debug("tx failed to be added to the mempool",
zap.Stringer("txID", txID),
zap.Error(err),
Expand All @@ -179,7 +187,22 @@ func (n *network) issueTx(tx *txs.Tx) error {
return nil
}

func (n *network) gossipTx(ctx context.Context, txID ids.ID, msgBytes []byte) {
func (n *network) gossipTx(ctx context.Context, tx *txs.Tx) error {
txBytes := tx.Bytes()
msg := &message.Tx{
Tx: txBytes,
}
msgBytes, err := message.Build(msg)
if err != nil {
return err
}

txID := tx.ID()
n.gossipTxMessage(ctx, txID, msgBytes)
return nil
}

func (n *network) gossipTxMessage(ctx context.Context, txID ids.ID, msgBytes []byte) {
n.recentTxsLock.Lock()
_, has := n.recentTxs.Get(txID)
n.recentTxs.Put(txID, struct{}{})
Expand Down
Loading

0 comments on commit 1f8f10f

Please sign in to comment.