Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make transaction propagation paths in the network deterministic #109

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
checkpoint = params.TrustedCheckpoints[genesisHash]
}
if eth.handler, err = newHandler(&handlerConfig{
NodeID: eth.p2pServer.Self().ID(),
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Expand Down
77 changes: 66 additions & 11 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/scroll-tech/go-ethereum/core"
"github.com/scroll-tech/go-ethereum/core/forkid"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/eth/downloader"
"github.com/scroll-tech/go-ethereum/eth/fetcher"
"github.com/scroll-tech/go-ethereum/eth/protocols/eth"
Expand All @@ -36,13 +37,20 @@ import (
"github.com/scroll-tech/go-ethereum/event"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/p2p"
"github.com/scroll-tech/go-ethereum/p2p/enode"
"github.com/scroll-tech/go-ethereum/params"
"golang.org/x/crypto/sha3"
)

const (
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096

// txMaxBroadcastSize is the max size of a transaction that will be broadcasted.
// All transactions with a higher size will be announced and need to be fetched
// by the peer.
txMaxBroadcastSize = 4096
)

var (
Expand Down Expand Up @@ -75,6 +83,7 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
NodeID enode.ID // P2P node ID used for tx propagation topology
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Expand All @@ -87,6 +96,7 @@ type handlerConfig struct {
}

type handler struct {
nodeID enode.ID
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node

Expand Down Expand Up @@ -128,6 +138,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
nodeID: config.NodeID,
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
Expand Down Expand Up @@ -549,6 +560,9 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
// already have the given transaction.
func (h *handler) BroadcastTransactions(txs types.Transactions) {
var (
blobTxs int // Number of blob transactions to announce only
largeTxs int // Number of large transactions to announce only

annoCount int // Count of announcements made
annoPeers int
directCount int // Count of the txs sent directly to peers
Expand All @@ -558,21 +572,63 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce

)

// Broadcast transactions to a batch of peers not knowing about it
direct := big.NewInt(int64(math.Sqrt(float64(h.peers.len())))) // Approximate number of peers to broadcast to
if direct.BitLen() == 0 {
direct = big.NewInt(1)
}
total := new(big.Int).Exp(direct, big.NewInt(2), nil) // Stabilise total peer count a bit based on sqrt peers

var (
signer = types.LatestSignerForChainID(h.chain.Config().ChainID) // Don't care about chain status, we just need *a* sender
hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash = make([]byte, 32)
)

for _, tx := range txs {
// L1 messages are not broadcast to peers
if tx.IsL1MessageTx() {
continue
}
peers := h.peers.peersWithoutTransaction(tx.Hash())
// Send the tx unconditionally to a subset of our peers
numDirect := int(math.Sqrt(float64(len(peers))))
for _, peer := range peers[:numDirect] {
txset[peer] = append(txset[peer], tx.Hash())

var maybeDirect bool
switch {
case tx.Type() == types.BlobTxType:
blobTxs++
case tx.Size() > txMaxBroadcastSize:
largeTxs++
default:
maybeDirect = true
}
// For the remaining peers, send announcement only
for _, peer := range peers[numDirect:] {
annos[peer] = append(annos[peer], tx.Hash())

// Send the transaction (if it's small enough) directly to a subset of
// the peers that have not received it yet, ensuring that the flow of
// transactions is groupped by account to (try and) avoid nonce gaps.
//
// To do this, we hash the local enode IW with together with a peer's
// enode ID together with the transaction sender and broadcast if
// `sha(self, peer, sender) mod peers < sqrt(peers)`.
for _, peer := range h.peers.peersWithoutTransaction(tx.Hash()) {
var broadcast bool
if maybeDirect {
hasher.Reset()
hasher.Write(h.nodeID.Bytes())
hasher.Write(peer.Node().ID().Bytes())

from, _ := types.Sender(signer, tx) // Ignore error, we only use the addr as a propagation target splitter
hasher.Write(from.Bytes())

hasher.Read(hash)
if new(big.Int).Mod(new(big.Int).SetBytes(hash), total).Cmp(direct) < 0 {
broadcast = true
}
}
if broadcast {
txset[peer] = append(txset[peer], tx.Hash())
} else {
annos[peer] = append(annos[peer], tx.Hash())
}
}
}
for peer, hashes := range txset {
Expand All @@ -585,9 +641,8 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
annoCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes)
}
log.Debug("Transaction broadcast", "txs", len(txs),
"announce packs", annoPeers, "announced hashes", annoCount,
"tx packs", directPeers, "broadcast txs", directCount)
log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs,
"bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annoPeers, "anncount", annoCount)
}

// minedBroadcastLoop sends mined blocks to connected peers.
Expand Down