diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md index 9d43f6b5e1..5381269fcb 100644 --- a/.github/CONTRIBUTING.md +++ b/.github/CONTRIBUTING.md @@ -1,14 +1,14 @@ # Contributing -Thank you for considering to help out with the source code! We welcome -contributions from anyone on the internet, and are grateful for even the +Thank you for considering to help out with the source code! We welcome +contributions from anyone on the internet, and are grateful for even the smallest of fixes! -If you'd like to contribute to coreth, please fork, fix, commit and send a +If you'd like to contribute to subnet-evm, please fork, fix, commit and send a pull request for the maintainers to review and merge into the main code base. If -you wish to submit more complex changes though, please check up with the core -devs first on [Discord](https://chat.avalabs.org) to -ensure those changes are in line with the general philosophy of the project +you wish to submit more complex changes though, please check up with the core +devs first on [Discord](https://chat.avalabs.org) to +ensure those changes are in line with the general philosophy of the project and/or get some early feedback which can make both your efforts much lighter as well as our review and merge procedures quick and simple. @@ -16,20 +16,19 @@ well as our review and merge procedures quick and simple. Please make sure your contributions adhere to our coding guidelines: - * Code must adhere to the official Go -[formatting](https://golang.org/doc/effective_go.html#formatting) guidelines -(i.e. uses [gofmt](https://golang.org/cmd/gofmt/)). - * Code must be documented adhering to the official Go -[commentary](https://golang.org/doc/effective_go.html#commentary) guidelines. - * Pull requests need to be based on and opened against the `master` branch. - * Pull reuqests should include a detailed description - * Commits are required to be signed. See [here](https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits) - for information on signing commits. - * Commit messages should be prefixed with the package(s) they modify. - * E.g. "eth, rpc: make trace configs optional" +- Code must adhere to the official Go + [formatting](https://golang.org/doc/effective_go.html#formatting) guidelines + (i.e. uses [gofmt](https://golang.org/cmd/gofmt/)). +- Code must be documented adhering to the official Go + [commentary](https://golang.org/doc/effective_go.html#commentary) guidelines. +- Pull requests need to be based on and opened against the `master` branch. +- Pull reuqests should include a detailed description +- Commits are required to be signed. See [here](https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits) + for information on signing commits. +- Commit messages should be prefixed with the package(s) they modify. + - E.g. "eth, rpc: make trace configs optional" ## Can I have feature X -Before you submit a feature request, please check and make sure that it isn't +Before you submit a feature request, please check and make sure that it isn't possible through some other means. - diff --git a/core/blockchain.go b/core/blockchain.go index 226e32a96e..50e688e473 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -85,9 +85,13 @@ var ( acceptedBlockGasUsedCounter = metrics.NewRegisteredCounter("chain/block/gas/used/accepted", nil) badBlockCounter = metrics.NewRegisteredCounter("chain/block/bad/count", nil) + txUnindexTimer = metrics.NewRegisteredCounter("chain/txs/unindex", nil) acceptedTxsCounter = metrics.NewRegisteredCounter("chain/txs/accepted", nil) processedTxsCounter = metrics.NewRegisteredCounter("chain/txs/processed", nil) + acceptedLogsCounter = metrics.NewRegisteredCounter("chain/logs/accepted", nil) + processedLogsCounter = metrics.NewRegisteredCounter("chain/logs/processed", nil) + ErrRefuseToCorruptArchiver = errors.New("node has operated with pruning disabled, shutting down to prevent missing tries") errFutureBlockUnsupported = errors.New("future block insertion not supported") @@ -102,7 +106,6 @@ const ( feeConfigCacheLimit = 256 coinbaseConfigCacheLimit = 256 badBlockLimit = 10 - TriesInMemory = 128 // BlockChainVersion ensures that an incompatible database forces a resync from scratch. // @@ -173,6 +176,7 @@ type CacheConfig struct { SkipSnapshotRebuild bool // Whether to skip rebuilding the snapshot in favor of returning an error (only set to true for tests) Preimages bool // Whether to store preimage of trie key to the disk AcceptedCacheSize int // Depth of accepted headers cache and accepted logs cache at the accepted tip + TxLookupLimit uint64 // Number of recent blocks for which to maintain transaction lookup indices } var DefaultCacheConfig = &CacheConfig{ @@ -269,9 +273,8 @@ type BlockChain struct { // during shutdown and in tests. acceptorWg sync.WaitGroup - // [rejournalWg] is used to wait for the trie clean rejournaling to complete. - // This is used during shutdown. - rejournalWg sync.WaitGroup + // [wg] is used to wait for the async blockchain processes to finish on shutdown. + wg sync.WaitGroup // quit channel is used to listen for when the blockchain is shut down to close // async processes. @@ -354,6 +357,13 @@ func NewBlockChain( // Create the state manager bc.stateManager = NewTrieWriter(bc.stateCache.TrieDB(), cacheConfig) + // loadLastState writes indices, so we should start the tx indexer after that. + // Start tx indexer/unindexer here. + if bc.cacheConfig.TxLookupLimit != 0 { + bc.wg.Add(1) + go bc.dispatchTxUnindexer() + } + // Re-generate current block state if it is missing if err := bc.loadLastState(lastAcceptedHash); err != nil { return nil, err @@ -401,9 +411,9 @@ func NewBlockChain( log.Info("Starting to save trie clean cache periodically", "journalDir", bc.cacheConfig.TrieCleanJournal, "freq", bc.cacheConfig.TrieCleanRejournal) triedb := bc.stateCache.TrieDB() - bc.rejournalWg.Add(1) + bc.wg.Add(1) go func() { - defer bc.rejournalWg.Done() + defer bc.wg.Done() triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit) }() } @@ -411,6 +421,72 @@ func NewBlockChain( return bc, nil } +// dispatchTxUnindexer is responsible for the deletion of the +// transaction index. +// Invariant: If TxLookupLimit is 0, it means all tx indices will be preserved. +// Meaning that this function should never be called. +func (bc *BlockChain) dispatchTxUnindexer() { + defer bc.wg.Done() + txLookupLimit := bc.cacheConfig.TxLookupLimit + + // If the user just upgraded to a new version which supports transaction + // index pruning, write the new tail and remove anything older. + if rawdb.ReadTxIndexTail(bc.db) == nil { + rawdb.WriteTxIndexTail(bc.db, 0) + } + + // unindexes transactions depending on user configuration + unindexBlocks := func(tail uint64, head uint64, done chan struct{}) { + start := time.Now() + defer func() { + txUnindexTimer.Inc(time.Since(start).Milliseconds()) + done <- struct{}{} + }() + + // Update the transaction index to the new chain state + if head-txLookupLimit+1 >= tail { + // Unindex a part of stale indices and forward index tail to HEAD-limit + rawdb.UnindexTransactions(bc.db, tail, head-txLookupLimit+1, bc.quit) + } + } + // Any reindexing done, start listening to chain events and moving the index window + var ( + done chan struct{} // Non-nil if background unindexing or reindexing routine is active. + headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed + ) + sub := bc.SubscribeChainAcceptedEvent(headCh) + if sub == nil { + log.Warn("could not create chain accepted subscription to unindex txs") + return + } + defer sub.Unsubscribe() + + for { + select { + case head := <-headCh: + headNum := head.Block.NumberU64() + if headNum < txLookupLimit { + break + } + + if done == nil { + done = make(chan struct{}) + // Note: tail will not be nil since it is initialized in this function. + tail := rawdb.ReadTxIndexTail(bc.db) + go unindexBlocks(*tail, headNum, done) + } + case <-done: + done = nil + case <-bc.quit: + if done != nil { + log.Info("Waiting background transaction indexer to exit") + <-done + } + return + } + } +} + // writeBlockAcceptedIndices writes any indices that must be persisted for accepted block. // This includes the following: // - transaction lookup indices @@ -532,6 +608,9 @@ func (bc *BlockChain) startAcceptor() { acceptorWorkTimer.Inc(time.Since(start).Milliseconds()) acceptorWorkCount.Inc(1) + // Note: in contrast to most accepted metrics, we increment the accepted log metrics in the acceptor queue because + // the logs are already processed in the acceptor queue. + acceptedLogsCounter.Inc(int64(len(logs))) } } @@ -555,8 +634,8 @@ func (bc *BlockChain) addAcceptorQueue(b *types.Block) { // DrainAcceptorQueue blocks until all items in [acceptorQueue] have been // processed. func (bc *BlockChain) DrainAcceptorQueue() { - bc.acceptorClosingLock.Lock() - defer bc.acceptorClosingLock.Unlock() + bc.acceptorClosingLock.RLock() + defer bc.acceptorClosingLock.RUnlock() if bc.acceptorClosed { return @@ -782,7 +861,8 @@ func (bc *BlockChain) ValidateCanonicalChain() error { // Transactions are only indexed beneath the last accepted block, so we only check // that the transactions have been indexed, if we are checking below the last accepted // block. - if current.NumberU64() <= bc.lastAccepted.NumberU64() { + shouldIndexTxs := bc.cacheConfig.TxLookupLimit == 0 || bc.lastAccepted.NumberU64() < current.NumberU64()+bc.cacheConfig.TxLookupLimit + if current.NumberU64() <= bc.lastAccepted.NumberU64() && shouldIndexTxs { // Ensure that all of the transactions have been stored correctly in the canonical // chain for txIndex, tx := range txs { @@ -840,7 +920,6 @@ func (bc *BlockChain) Stop() { return } - // Wait for accepted feed to process all remaining items log.Info("Closing quit channel") close(bc.quit) // Wait for accepted feed to process all remaining items @@ -868,9 +947,9 @@ func (bc *BlockChain) Stop() { log.Info("Closing scope") bc.scope.Close() - // Waiting for clean trie re-journal to complete - log.Info("Waiting for trie re-journal to complete") - bc.rejournalWg.Wait() + // Waiting for background processes to complete + log.Info("Waiting for background processes to complete") + bc.wg.Wait() log.Info("Blockchain stopped") } @@ -1313,6 +1392,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { processedBlockGasUsedCounter.Inc(int64(block.GasUsed())) processedTxsCounter.Inc(int64(block.Transactions().Len())) + processedLogsCounter.Inc(int64(len(logs))) blockInsertCount.Inc(1) return nil } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 270eef14df..e4eb86c3d0 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -745,3 +745,142 @@ func TestCanonicalHashMarker(t *testing.T) { } } } + +func TestTransactionIndices(t *testing.T) { + // Configure and generate a sample block chain + require := require.New(t) + var ( + gendb = rawdb.NewMemoryDatabase() + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = crypto.PubkeyToAddress(key2.PublicKey) + funds = big.NewInt(10000000000000) + gspec = &Genesis{ + Config: ¶ms.ChainConfig{HomesteadBlock: new(big.Int)}, + Alloc: GenesisAlloc{addr1: {Balance: funds}}, + } + genesis = gspec.MustCommit(gendb) + signer = types.LatestSigner(gspec.Config) + ) + height := uint64(128) + blocks, _, err := GenerateChain(gspec.Config, genesis, dummy.NewFaker(), gendb, int(height), 10, func(i int, block *BlockGen) { + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(addr1), addr2, big.NewInt(10000), params.TxGas, nil, nil), signer, key1) + require.NoError(err) + block.AddTx(tx) + }) + require.NoError(err) + + blocks2, _, err := GenerateChain(gspec.Config, blocks[len(blocks)-1], dummy.NewFaker(), gendb, 10, 10, nil) + require.NoError(err) + + check := func(tail *uint64, chain *BlockChain) { + stored := rawdb.ReadTxIndexTail(chain.db) + require.EqualValues(tail, stored) + + if tail == nil { + return + } + for i := *tail; i <= chain.CurrentBlock().NumberU64(); i++ { + block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i) + if block.Transactions().Len() == 0 { + continue + } + for _, tx := range block.Transactions() { + index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()) + require.NotNilf(index, "Miss transaction indices, number %d hash %s", i, tx.Hash().Hex()) + } + } + + for i := uint64(0); i < *tail; i++ { + block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i) + if block.Transactions().Len() == 0 { + continue + } + for _, tx := range block.Transactions() { + index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()) + require.Nilf(index, "Transaction indices should be deleted, number %d hash %s", i, tx.Hash().Hex()) + } + } + } + + conf := &CacheConfig{ + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, + Pruning: true, + CommitInterval: 4096, + SnapshotLimit: 256, + SkipSnapshotRebuild: true, // Ensure the test errors if snapshot initialization fails + AcceptorQueueLimit: 64, + } + + // Init block chain and check all needed indices has been indexed. + chainDB := rawdb.NewMemoryDatabase() + gspec.MustCommit(chainDB) + + chain, err := createBlockChain(chainDB, conf, gspec.Config, common.Hash{}) + require.NoError(err) + + _, err = chain.InsertChain(blocks) + require.NoError(err) + + for _, block := range blocks { + err := chain.Accept(block) + require.NoError(err) + } + chain.DrainAcceptorQueue() + + chain.Stop() + check(nil, chain) // check all indices has been indexed + + lastAcceptedHash := chain.CurrentHeader().Hash() + + // Reconstruct a block chain which only reserves limited tx indices + // 128 blocks were previously indexed. Now we add a new block at each test step. + limit := []uint64{130 /* 129 + 1 reserve all */, 64 /* drop stale */, 32 /* shorten history */} + tails := []uint64{0 /* reserve all */, 67 /* 130 - 64 + 1 */, 100 /* 131 - 32 + 1 */} + for i, l := range limit { + conf.TxLookupLimit = l + + chain, err := createBlockChain(chainDB, conf, gspec.Config, lastAcceptedHash) + require.NoError(err) + + newBlks := blocks2[i : i+1] + _, err = chain.InsertChain(newBlks) // Feed chain a higher block to trigger indices updater. + require.NoError(err) + + err = chain.Accept(newBlks[0]) // Accept the block to trigger indices updater. + require.NoError(err) + + chain.DrainAcceptorQueue() + time.Sleep(50 * time.Millisecond) // Wait for indices initialisation + + chain.Stop() + check(&tails[i], chain) + + lastAcceptedHash = chain.CurrentHeader().Hash() + } +} + +func TestTxLookupBlockChain(t *testing.T) { + cacheConf := &CacheConfig{ + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, + Pruning: true, + CommitInterval: 4096, + SnapshotLimit: 256, + SkipSnapshotRebuild: true, // Ensure the test errors if snapshot initialization fails + AcceptorQueueLimit: 64, // ensure channel doesn't block + TxLookupLimit: 5, + } + createTxLookupBlockChain := func(db ethdb.Database, chainConfig *params.ChainConfig, lastAcceptedHash common.Hash) (*BlockChain, error) { + return createBlockChain(db, cacheConf, chainConfig, lastAcceptedHash) + } + for _, tt := range tests { + t.Run(tt.Name, func(t *testing.T) { + tt.testFunc(t, createTxLookupBlockChain) + }) + } +} diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index d7add47ed8..8b4829139c 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -588,3 +588,23 @@ func ReadHeadBlock(db ethdb.Reader) *types.Block { } return ReadBlock(db, headBlockHash, *headBlockNumber) } + +// ReadTxIndexTail retrieves the number of oldest indexed block +// whose transaction indices has been indexed. If the corresponding entry +// is non-existent in database it means the indexing has been finished. +func ReadTxIndexTail(db ethdb.KeyValueReader) *uint64 { + data, _ := db.Get(txIndexTailKey) + if len(data) != 8 { + return nil + } + number := binary.BigEndian.Uint64(data) + return &number +} + +// WriteTxIndexTail stores the number of oldest indexed block +// into database. +func WriteTxIndexTail(db ethdb.KeyValueWriter, number uint64) { + if err := db.Put(txIndexTailKey, encodeBlockNumber(number)); err != nil { + log.Crit("Failed to store the transaction index tail", "err", err) + } +} diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go new file mode 100644 index 0000000000..cba39b57c6 --- /dev/null +++ b/core/rawdb/chain_iterator.go @@ -0,0 +1,311 @@ +// (c) 2019-2022, Ava Labs, Inc. +// +// This file is a derived work, based on the go-ethereum library whose original +// notices appear below. +// +// It is distributed under a license compatible with the licensing terms of the +// original code from which it is derived. +// +// Much love to the original authors for their work. +// ********** +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "runtime" + "sync/atomic" + "time" + + "github.com/ava-labs/subnet-evm/core/types" + "github.com/ava-labs/subnet-evm/ethdb" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +type blockTxHashes struct { + number uint64 + hashes []common.Hash +} + +// iterateTransactions iterates over all transactions in the (canon) block +// number(s) given, and yields the hashes on a channel. If there is a signal +// received from interrupt channel, the iteration will be aborted and result +// channel will be closed. +// Iterates blocks in the range [from, to) +func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool, interrupt chan struct{}) chan *blockTxHashes { + // One thread sequentially reads data from db + type numberRlp struct { + number uint64 + rlp rlp.RawValue + } + if to == from { + return nil + } + threads := to - from + if cpus := runtime.NumCPU(); threads > uint64(cpus) { + threads = uint64(cpus) + } + var ( + rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channel + hashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh + ) + // lookup runs in one instance + lookup := func() { + n, end := from, to + if reverse { + n, end = to-1, from-1 + } + defer close(rlpCh) + for n != end { + data := ReadCanonicalBodyRLP(db, n) + // Feed the block to the aggregator, or abort on interrupt + select { + case rlpCh <- &numberRlp{n, data}: + case <-interrupt: + return + } + if reverse { + n-- + } else { + n++ + } + } + } + // process runs in parallel + nThreadsAlive := int32(threads) + process := func() { + defer func() { + // Last processor closes the result channel + if atomic.AddInt32(&nThreadsAlive, -1) == 0 { + close(hashesCh) + } + }() + for data := range rlpCh { + var body types.Body + if err := rlp.DecodeBytes(data.rlp, &body); err != nil { + log.Warn("Failed to decode block body", "block", data.number, "error", err) + return + } + var hashes []common.Hash + for _, tx := range body.Transactions { + hashes = append(hashes, tx.Hash()) + } + result := &blockTxHashes{ + hashes: hashes, + number: data.number, + } + // Feed the block to the aggregator, or abort on interrupt + select { + case hashesCh <- result: + case <-interrupt: + return + } + } + } + go lookup() // start the sequential db accessor + for i := 0; i < int(threads); i++ { + go process() + } + return hashesCh +} + +// indexTransactions creates txlookup indices of the specified block range. +// +// This function iterates canonical chain in reverse order, it has one main advantage: +// We can write tx index tail flag periodically even without the whole indexing +// procedure is finished. So that we can resume indexing procedure next time quickly. +// +// There is a passed channel, the whole procedure will be interrupted if any +// signal received. +func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { + // short circuit for invalid range + if from >= to { + return + } + var ( + hashesCh = iterateTransactions(db, from, to, true, interrupt) + batch = db.NewBatch() + start = time.Now() + logged = start.Add(-7 * time.Second) + // Since we iterate in reverse, we expect the first number to come + // in to be [to-1]. Therefore, setting lastNum to means that the + // prqueue gap-evaluation will work correctly + lastNum = to + queue = prque.New(nil) + // for stats reporting + blocks, txs = 0, 0 + ) + for chanDelivery := range hashesCh { + // Push the delivery into the queue and process contiguous ranges. + // Since we iterate in reverse, so lower numbers have lower prio, and + // we can use the number directly as prio marker + queue.Push(chanDelivery, int64(chanDelivery.number)) + for !queue.Empty() { + // If the next available item is gapped, return + if _, priority := queue.Peek(); priority != int64(lastNum-1) { + break + } + // For testing + if hook != nil && !hook(lastNum-1) { + break + } + // Next block available, pop it off and index it + delivery := queue.PopItem().(*blockTxHashes) + lastNum = delivery.number + WriteTxLookupEntries(batch, delivery.number, delivery.hashes) + blocks++ + txs += len(delivery.hashes) + // If enough data was accumulated in memory or we're at the last block, dump to disk + if batch.ValueSize() > ethdb.IdealBatchSize { + WriteTxIndexTail(batch, lastNum) // Also write the tail here + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + batch.Reset() + } + // If we've spent too much time already, notify the user of what we're doing + if time.Since(logged) > 8*time.Second { + log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + } + // Flush the new indexing tail and the last committed data. It can also happen + // that the last batch is empty because nothing to index, but the tail has to + // be flushed anyway. + WriteTxIndexTail(batch, lastNum) + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + select { + case <-interrupt: + log.Debug("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) + default: + log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) + } +} + +// // IndexTransactions creates txlookup indices of the specified block range. The from +// // is included while to is excluded. +// // +// // This function iterates canonical chain in reverse order, it has one main advantage: +// // We can write tx index tail flag periodically even without the whole indexing +// // procedure is finished. So that we can resume indexing procedure next time quickly. +// // +// // There is a passed channel, the whole procedure will be interrupted if any +// // signal received. +// func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) { +// indexTransactions(db, from, to, interrupt, nil) +// } + +// indexTransactionsForTesting is the internal debug version with an additional hook. +func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { + indexTransactions(db, from, to, interrupt, hook) +} + +// unindexTransactions removes txlookup indices of the specified block range. +// +// There is a passed channel, the whole procedure will be interrupted if any +// signal received. +func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { + // short circuit for invalid range + if from >= to { + return + } + var ( + hashesCh = iterateTransactions(db, from, to, false, interrupt) + batch = db.NewBatch() + start = time.Now() + logged = start.Add(-7 * time.Second) + // we expect the first number to come in to be [from]. Therefore, setting + // nextNum to from means that the prqueue gap-evaluation will work correctly + nextNum = from + queue = prque.New(nil) + // for stats reporting + blocks, txs = 0, 0 + ) + // Otherwise spin up the concurrent iterator and unindexer + for delivery := range hashesCh { + // Push the delivery into the queue and process contiguous ranges. + queue.Push(delivery, -int64(delivery.number)) + for !queue.Empty() { + // If the next available item is gapped, return + if _, priority := queue.Peek(); -priority != int64(nextNum) { + break + } + // For testing + if hook != nil && !hook(nextNum) { + break + } + delivery := queue.PopItem().(*blockTxHashes) + nextNum = delivery.number + 1 + DeleteTxLookupEntries(batch, delivery.hashes) + txs += len(delivery.hashes) + blocks++ + + // If enough data was accumulated in memory or we're at the last block, dump to disk + // A batch counts the size of deletion as '1', so we need to flush more + // often than that. + if blocks%1000 == 0 { + WriteTxIndexTail(batch, nextNum) + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + batch.Reset() + } + // If we've spent too much time already, notify the user of what we're doing + if time.Since(logged) > 8*time.Second { + log.Info("Unindexing transactions", "blocks", blocks, "txs", txs, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + } + // Flush the new indexing tail and the last committed data. It can also happen + // that the last batch is empty because nothing to unindex, but the tail has to + // be flushed anyway. + WriteTxIndexTail(batch, nextNum) + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + select { + case <-interrupt: + log.Debug("Transaction unindexing interrupted", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) + default: + log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) + } +} + +// UnindexTransactions removes txlookup indices of the specified block range. +// The from is included while to is excluded. +// +// There is a passed channel, the whole procedure will be interrupted if any +// signal received. +func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) { + unindexTransactions(db, from, to, interrupt, nil) +} + +// unindexTransactionsForTesting is the internal debug version with an additional hook. +func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { + unindexTransactions(db, from, to, interrupt, hook) +} diff --git a/core/rawdb/chain_iterator_test.go b/core/rawdb/chain_iterator_test.go new file mode 100644 index 0000000000..0873dd3667 --- /dev/null +++ b/core/rawdb/chain_iterator_test.go @@ -0,0 +1,218 @@ +// (c) 2019-2022, Ava Labs, Inc. +// +// This file is a derived work, based on the go-ethereum library whose original +// notices appear below. +// +// It is distributed under a license compatible with the licensing terms of the +// original code from which it is derived. +// +// Much love to the original authors for their work. +// ********** +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "math/big" + "reflect" + "sort" + "sync" + "testing" + + "github.com/ava-labs/subnet-evm/core/types" + "github.com/ethereum/go-ethereum/common" +) + +func TestChainIterator(t *testing.T) { + // Construct test chain db + chainDb := NewMemoryDatabase() + + var block *types.Block + var txs []*types.Transaction + to := common.BytesToAddress([]byte{0x11}) + block = types.NewBlock(&types.Header{Number: big.NewInt(int64(0))}, nil, nil, nil, newHasher()) // Empty genesis block + WriteBlock(chainDb, block) + WriteCanonicalHash(chainDb, block.Hash(), block.NumberU64()) + for i := uint64(1); i <= 10; i++ { + var tx *types.Transaction + if i%2 == 0 { + tx = types.NewTx(&types.LegacyTx{ + Nonce: i, + GasPrice: big.NewInt(11111), + Gas: 1111, + To: &to, + Value: big.NewInt(111), + Data: []byte{0x11, 0x11, 0x11}, + }) + } else { + tx = types.NewTx(&types.AccessListTx{ + ChainID: big.NewInt(1337), + Nonce: i, + GasPrice: big.NewInt(11111), + Gas: 1111, + To: &to, + Value: big.NewInt(111), + Data: []byte{0x11, 0x11, 0x11}, + }) + } + txs = append(txs, tx) + block = types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, []*types.Transaction{tx}, nil, nil, newHasher()) + WriteBlock(chainDb, block) + WriteCanonicalHash(chainDb, block.Hash(), block.NumberU64()) + } + + cases := []struct { + from, to uint64 + reverse bool + expect []int + }{ + {0, 11, true, []int{10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}}, + {0, 0, true, nil}, + {0, 5, true, []int{4, 3, 2, 1, 0}}, + {10, 11, true, []int{10}}, + {0, 11, false, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}}, + {0, 0, false, nil}, + {10, 11, false, []int{10}}, + } + for i, c := range cases { + var numbers []int + hashCh := iterateTransactions(chainDb, c.from, c.to, c.reverse, nil) + if hashCh != nil { + for h := range hashCh { + numbers = append(numbers, int(h.number)) + if len(h.hashes) > 0 { + if got, exp := h.hashes[0], txs[h.number-1].Hash(); got != exp { + t.Fatalf("block %d: hash wrong, got %x exp %x", h.number, got, exp) + } + } + } + } + if !c.reverse { + sort.Ints(numbers) + } else { + sort.Sort(sort.Reverse(sort.IntSlice(numbers))) + } + if !reflect.DeepEqual(numbers, c.expect) { + t.Fatalf("Case %d failed, visit element mismatch, want %v, got %v", i, c.expect, numbers) + } + } +} + +func TestIndexTransactions(t *testing.T) { + // Construct test chain db + chainDb := NewMemoryDatabase() + + var block *types.Block + var txs []*types.Transaction + to := common.BytesToAddress([]byte{0x11}) + + // Write empty genesis block + block = types.NewBlock(&types.Header{Number: big.NewInt(int64(0))}, nil, nil, nil, newHasher()) + WriteBlock(chainDb, block) + WriteCanonicalHash(chainDb, block.Hash(), block.NumberU64()) + + for i := uint64(1); i <= 10; i++ { + var tx *types.Transaction + if i%2 == 0 { + tx = types.NewTx(&types.LegacyTx{ + Nonce: i, + GasPrice: big.NewInt(11111), + Gas: 1111, + To: &to, + Value: big.NewInt(111), + Data: []byte{0x11, 0x11, 0x11}, + }) + } else { + tx = types.NewTx(&types.AccessListTx{ + ChainID: big.NewInt(1337), + Nonce: i, + GasPrice: big.NewInt(11111), + Gas: 1111, + To: &to, + Value: big.NewInt(111), + Data: []byte{0x11, 0x11, 0x11}, + }) + } + txs = append(txs, tx) + block = types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, []*types.Transaction{tx}, nil, nil, newHasher()) + WriteBlock(chainDb, block) + WriteCanonicalHash(chainDb, block.Hash(), block.NumberU64()) + } + // verify checks whether the tx indices in the range [from, to) + // is expected. + verify := func(from, to int, exist bool, tail uint64) { + for i := from; i < to; i++ { + if i == 0 { + continue + } + number := ReadTxLookupEntry(chainDb, txs[i-1].Hash()) + if exist && number == nil { + t.Fatalf("Transaction index %d missing", i) + } + if !exist && number != nil { + t.Fatalf("Transaction index %d is not deleted", i) + } + } + number := ReadTxIndexTail(chainDb) + if number == nil || *number != tail { + t.Fatalf("Transaction tail mismatch") + } + } + indexTransactionsForTesting(chainDb, 5, 11, nil, nil) + verify(5, 11, true, 5) + verify(0, 5, false, 5) + + indexTransactionsForTesting(chainDb, 0, 5, nil, nil) + verify(0, 11, true, 0) + + UnindexTransactions(chainDb, 0, 5, nil) + verify(5, 11, true, 5) + verify(0, 5, false, 5) + + UnindexTransactions(chainDb, 5, 11, nil) + verify(0, 11, false, 11) + + // Testing corner cases + signal := make(chan struct{}) + var once sync.Once + indexTransactionsForTesting(chainDb, 5, 11, signal, func(n uint64) bool { + if n <= 8 { + once.Do(func() { + close(signal) + }) + return false + } + return true + }) + verify(9, 11, true, 9) + verify(0, 9, false, 9) + indexTransactionsForTesting(chainDb, 0, 9, nil, nil) + + signal = make(chan struct{}) + var once2 sync.Once + unindexTransactionsForTesting(chainDb, 0, 11, signal, func(n uint64) bool { + if n >= 8 { + once2.Do(func() { + close(signal) + }) + return false + } + return true + }) + verify(8, 11, true, 8) + verify(0, 8, false, 8) +} diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 51c3da8065..dc4dd85688 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -208,7 +208,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { for _, meta := range [][]byte{ databaseVersionKey, headHeaderKey, headBlockKey, snapshotRootKey, snapshotBlockHashKey, snapshotGeneratorKey, - uncleanShutdownKey, syncRootKey, + uncleanShutdownKey, syncRootKey, txIndexTailKey, } { if bytes.Equal(key, meta) { metadata.Add(size) diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 4eb3382a84..bb0956abdf 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -56,6 +56,9 @@ var ( // snapshotGeneratorKey tracks the snapshot generation marker across restarts. snapshotGeneratorKey = []byte("SnapshotGenerator") + // txIndexTailKey tracks the oldest block whose transactions have been indexed. + txIndexTailKey = []byte("TransactionIndexTail") + // uncleanShutdownKey tracks the list of local crashes uncleanShutdownKey = []byte("unclean-shutdown") // config prefix for the db diff --git a/eth/backend.go b/eth/backend.go index 7e9aaebd84..958460a359 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -215,6 +215,7 @@ func New( SkipSnapshotRebuild: config.SkipSnapshotRebuild, Preimages: config.Preimages, AcceptedCacheSize: config.AcceptedCacheSize, + TxLookupLimit: config.TxLookupLimit, } ) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index c1be8c0ead..98a16165a7 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -149,4 +149,10 @@ type Config struct { // their node before the network upgrade and their node accepts blocks that have // identical state with the pre-upgrade ruleset. SkipUpgradeCheck bool + + // TxLookupLimit is the maximum number of blocks from head whose tx indices + // are reserved: + // * 0: means no limit + // * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes + TxLookupLimit uint64 } diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go index e9a55ae102..7ba3381f2f 100644 --- a/eth/filters/bench_test.go +++ b/eth/filters/bench_test.go @@ -73,7 +73,7 @@ const benchFilterCnt = 2000 func benchmarkBloomBits(b *testing.B, sectionSize uint64) { b.Skip("test disabled: this tests presume (and modify) an existing datadir.") - benchDataDir := b.TempDir() + "/coreth/chaindata" + benchDataDir := b.TempDir() + "/subnet-evm/chaindata" b.Log("Running bloombits benchmark section size:", sectionSize) db, err := rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "", false) @@ -173,7 +173,7 @@ func clearBloomBits(db ethdb.Database) { func BenchmarkNoBloomBits(b *testing.B) { b.Skip("test disabled: this tests presume (and modify) an existing datadir.") - benchDataDir := b.TempDir() + "/coreth/chaindata" + benchDataDir := b.TempDir() + "/subnet-evm/chaindata" b.Log("Running benchmark without bloombits") db, err := rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "", false) if err != nil { diff --git a/peer/client.go b/peer/client.go index 31fe2175a9..6a002d0e38 100644 --- a/peer/client.go +++ b/peer/client.go @@ -19,15 +19,19 @@ var ( // NetworkClient defines ability to send request / response through the Network type NetworkClient interface { - // RequestAny synchronously sends request to a randomly chosen peer with a + // SendAppRequestAny synchronously sends request to an arbitrary peer with a // node version greater than or equal to minVersion. // Returns response bytes, the ID of the chosen peer, and ErrRequestFailed if // the request should be retried. - RequestAny(minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) + SendAppRequestAny(minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) - // Request synchronously sends request to the selected nodeID + // SendAppRequest synchronously sends request to the selected nodeID // Returns response bytes, and ErrRequestFailed if the request should be retried. - Request(nodeID ids.NodeID, request []byte) ([]byte, error) + SendAppRequest(nodeID ids.NodeID, request []byte) ([]byte, error) + + // SendCrossChainRequest sends a request to a specific blockchain running on this node. + // Returns response bytes, and ErrRequestFailed if the request failed. + SendCrossChainRequest(chainID ids.ID, request []byte) ([]byte, error) // Gossip sends given gossip message to peers Gossip(gossip []byte) error @@ -51,13 +55,13 @@ func NewNetworkClient(network Network) NetworkClient { } } -// RequestAny synchronously sends request to a randomly chosen peer with a +// SendAppRequestAny synchronously sends request to an arbitrary peer with a // node version greater than or equal to minVersion. // Returns response bytes, the ID of the chosen peer, and ErrRequestFailed if // the request should be retried. -func (c *client) RequestAny(minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) { +func (c *client) SendAppRequestAny(minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) { waitingHandler := newWaitingResponseHandler() - nodeID, err := c.network.RequestAny(minVersion, request, waitingHandler) + nodeID, err := c.network.SendAppRequestAny(minVersion, request, waitingHandler) if err != nil { return nil, nodeID, err } @@ -68,11 +72,25 @@ func (c *client) RequestAny(minVersion *version.Application, request []byte) ([] return response, nodeID, nil } -// Request synchronously sends request to the specified nodeID +// SendAppRequest synchronously sends request to the specified nodeID +// Returns response bytes and ErrRequestFailed if the request should be retried. +func (c *client) SendAppRequest(nodeID ids.NodeID, request []byte) ([]byte, error) { + waitingHandler := newWaitingResponseHandler() + if err := c.network.SendAppRequest(nodeID, request, waitingHandler); err != nil { + return nil, err + } + response := <-waitingHandler.responseChan + if waitingHandler.failed { + return nil, ErrRequestFailed + } + return response, nil +} + +// SendCrossChainRequest synchronously sends request to the specified chainID // Returns response bytes and ErrRequestFailed if the request should be retried. -func (c *client) Request(nodeID ids.NodeID, request []byte) ([]byte, error) { +func (c *client) SendCrossChainRequest(chainID ids.ID, request []byte) ([]byte, error) { waitingHandler := newWaitingResponseHandler() - if err := c.network.Request(nodeID, request, waitingHandler); err != nil { + if err := c.network.SendCrossChainRequest(chainID, request, waitingHandler); err != nil { return nil, err } response := <-waitingHandler.responseChan diff --git a/peer/network.go b/peer/network.go index 377235dad3..6d0ebe5990 100644 --- a/peer/network.go +++ b/peer/network.go @@ -30,6 +30,7 @@ const minRequestHandlingDuration = 100 * time.Millisecond var ( errAcquiringSemaphore = errors.New("error acquiring semaphore") + errExpiredRequest = errors.New("expired request") _ Network = &network{} _ validators.Connector = &network{} _ common.AppHandler = &network{} @@ -39,18 +40,21 @@ type Network interface { validators.Connector common.AppHandler - // RequestAny synchronously sends request to a randomly chosen peer with a + // SendAppRequestAny synchronously sends request to an arbitrary peer with a // node version greater than or equal to minVersion. // Returns the ID of the chosen peer, and an error if the request could not // be sent to a peer with the desired [minVersion]. - RequestAny(minVersion *version.Application, message []byte, handler message.ResponseHandler) (ids.NodeID, error) + SendAppRequestAny(minVersion *version.Application, message []byte, handler message.ResponseHandler) (ids.NodeID, error) - // Request sends message to given nodeID, notifying handler when there's a response or timeout - Request(nodeID ids.NodeID, message []byte, handler message.ResponseHandler) error + // SendAppRequest sends message to given nodeID, notifying handler when there's a response or timeout + SendAppRequest(nodeID ids.NodeID, message []byte, handler message.ResponseHandler) error // Gossip sends given gossip message to peers Gossip(gossip []byte) error + // SendCrossChainRequest sends a message to given chainID notifying handler when there's a response or timeout + SendCrossChainRequest(chainID ids.ID, message []byte, handler message.ResponseHandler) error + // Shutdown stops all peer channel listeners and marks the node to have stopped // n.Start() can be called again but the peers will have to be reconnected // by calling OnPeerConnected for each peer @@ -62,6 +66,9 @@ type Network interface { // SetRequestHandler sets the provided request handler as the request handler SetRequestHandler(handler message.RequestHandler) + // SetCrossChainHandler sets the provided cross chain request handler as the cross chain request handler + SetCrossChainRequestHandler(handler message.CrossChainRequestHandler) + // Size returns the size of the network in number of connected peers Size() uint32 @@ -77,74 +84,82 @@ type network struct { self ids.NodeID // NodeID of this node requestIDGen uint32 // requestID counter used to track outbound requests outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler - activeRequests *semaphore.Weighted // controls maximum number of active outbound requests + activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests + activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests appSender common.AppSender // avalanchego AppSender for sending messages codec codec.Manager // Codec used for parsing messages - requestHandler message.RequestHandler // maps request type => handler + crossChainCodec codec.Manager // Codec used for parsing cross chain messages + appRequestHandler message.RequestHandler // maps request type => handler + crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler gossipHandler message.GossipHandler // maps gossip type => handler peers *peerTracker // tracking of peers & bandwidth - stats stats.RequestHandlerStats // Provide request handler metrics + appStats stats.RequestHandlerStats // Provide request handler metrics + crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics } -func NewNetwork(appSender common.AppSender, codec codec.Manager, self ids.NodeID, maxActiveRequests int64) Network { +func NewNetwork(appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network { return &network{ appSender: appSender, codec: codec, + crossChainCodec: crossChainCodec, self: self, outstandingRequestHandlers: make(map[uint32]message.ResponseHandler), - activeRequests: semaphore.NewWeighted(maxActiveRequests), + activeAppRequests: semaphore.NewWeighted(maxActiveAppRequests), + activeCrossChainRequests: semaphore.NewWeighted(maxActiveCrossChainRequests), gossipHandler: message.NoopMempoolGossipHandler{}, - requestHandler: message.NoopRequestHandler{}, + appRequestHandler: message.NoopRequestHandler{}, + crossChainRequestHandler: message.NoopCrossChainRequestHandler{}, peers: NewPeerTracker(), - stats: stats.NewRequestHandlerStats(), + appStats: stats.NewRequestHandlerStats(), + crossChainStats: stats.NewCrossChainRequestHandlerStats(), } } -// RequestAny synchronously sends request to a randomly chosen peer with a +// SendAppRequestAny synchronously sends request to an arbitrary peer with a // node version greater than or equal to minVersion. If minVersion is nil, // the request will be sent to any peer regardless of their version. // Returns the ID of the chosen peer, and an error if the request could not // be sent to a peer with the desired [minVersion]. -func (n *network) RequestAny(minVersion *version.Application, request []byte, handler message.ResponseHandler) (ids.NodeID, error) { - // Take a slot from total [activeRequests] and block until a slot becomes available. - if err := n.activeRequests.Acquire(context.Background(), 1); err != nil { +func (n *network) SendAppRequestAny(minVersion *version.Application, request []byte, handler message.ResponseHandler) (ids.NodeID, error) { + // Take a slot from total [activeAppRequests] and block until a slot becomes available. + if err := n.activeAppRequests.Acquire(context.Background(), 1); err != nil { return ids.EmptyNodeID, errAcquiringSemaphore } n.lock.Lock() defer n.lock.Unlock() if nodeID, ok := n.peers.GetAnyPeer(minVersion); ok { - return nodeID, n.request(nodeID, request, handler) + return nodeID, n.sendAppRequest(nodeID, request, handler) } - n.activeRequests.Release(1) + n.activeAppRequests.Release(1) return ids.EmptyNodeID, fmt.Errorf("no peers found matching version %s out of %d peers", minVersion, n.peers.Size()) } -// Request sends request message bytes to specified nodeID, notifying the responseHandler on response or failure -func (n *network) Request(nodeID ids.NodeID, request []byte, responseHandler message.ResponseHandler) error { +// SendAppRequest sends request message bytes to specified nodeID, notifying the responseHandler on response or failure +func (n *network) SendAppRequest(nodeID ids.NodeID, request []byte, responseHandler message.ResponseHandler) error { if nodeID == ids.EmptyNodeID { return fmt.Errorf("cannot send request to empty nodeID, nodeID=%s, requestLen=%d", nodeID, len(request)) } - // Take a slot from total [activeRequests] and block until a slot becomes available. - if err := n.activeRequests.Acquire(context.Background(), 1); err != nil { + // Take a slot from total [activeAppRequests] and block until a slot becomes available. + if err := n.activeAppRequests.Acquire(context.Background(), 1); err != nil { return errAcquiringSemaphore } n.lock.Lock() defer n.lock.Unlock() - return n.request(nodeID, request, responseHandler) + return n.sendAppRequest(nodeID, request, responseHandler) } -// request sends request message bytes to specified nodeID and adds [responseHandler] to [outstandingRequestHandlers] +// sendAppRequest sends request message bytes to specified nodeID and adds [responseHandler] to [outstandingRequestHandlers] // so that it can be invoked when the network receives either a response or failure message. // Assumes [nodeID] is never [self] since we guarantee [self] will not be added to the [peers] map. // Releases active requests semaphore if there was an error in sending the request // Returns an error if [appSender] is unable to make the request. // Assumes write lock is held -func (n *network) request(nodeID ids.NodeID, request []byte, responseHandler message.ResponseHandler) error { +func (n *network) sendAppRequest(nodeID ids.NodeID, request []byte, responseHandler message.ResponseHandler) error { log.Debug("sending request to peer", "nodeID", nodeID, "requestLen", len(request)) n.peers.TrackPeer(nodeID) @@ -157,12 +172,10 @@ func (n *network) request(nodeID ids.NodeID, request []byte, responseHandler mes nodeIDs := set.NewSet[ids.NodeID](1) nodeIDs.Add(nodeID) - // send app request to the peer - // on failure: release the activeRequests slot, mark message as processed and return fatal error // Send app request to [nodeID]. - // On failure, release the slot from active requests and [outstandingRequestHandlers]. + // On failure, release the slot from [activeAppRequests] and delete request from [outstandingRequestHandlers] if err := n.appSender.SendAppRequest(context.TODO(), nodeIDs, requestID, request); err != nil { - n.activeRequests.Release(1) + n.activeAppRequests.Release(1) delete(n.outstandingRequestHandlers, requestID) return err } @@ -171,16 +184,116 @@ func (n *network) request(nodeID ids.NodeID, request []byte, responseHandler mes return nil } -func (n *network) CrossChainAppRequest(_ context.Context, requestingChainID ids.ID, requestID uint32, deadline time.Time, request []byte) error { +// SendCrossChainRequest sends request message bytes to specified chainID and adds [handler] to [outstandingRequestHandlers] +// so that it can be invoked when the network receives either a response or failure message. +// Returns an error if [appSender] is unable to make the request. +func (n *network) SendCrossChainRequest(chainID ids.ID, request []byte, handler message.ResponseHandler) error { + // Take a slot from total [activeCrossChainRequests] and block until a slot becomes available. + if err := n.activeCrossChainRequests.Acquire(context.Background(), 1); err != nil { + return errAcquiringSemaphore + } + + n.lock.Lock() + defer n.lock.Unlock() + + // generate requestID + requestID := n.requestIDGen + n.requestIDGen++ + + n.outstandingRequestHandlers[requestID] = handler + + // Send cross chain request to [chainID]. + // On failure, release the slot from [activeCrossChainRequests] and delete request from [outstandingRequestHandlers]. + if err := n.appSender.SendCrossChainAppRequest(context.TODO(), chainID, requestID, request); err != nil { + n.activeCrossChainRequests.Release(1) + delete(n.outstandingRequestHandlers, requestID) + return err + } + + log.Debug("sent request message to chain", "chainID", chainID, "crossChainRequestID", requestID) return nil } -func (n *network) CrossChainAppRequestFailed(_ context.Context, respondingChainID ids.ID, requestID uint32) error { - return nil +// CrossChainAppRequest notifies the VM when another chain in the network requests for data. +// Send a CrossChainAppResponse to [chainID] in response to a valid message using the same +// [requestID] before the deadline. +func (n *network) CrossChainAppRequest(ctx context.Context, requestingChainID ids.ID, requestID uint32, deadline time.Time, request []byte) error { + log.Debug("received CrossChainAppRequest from chain", "requestingChainID", requestingChainID, "requestID", requestID, "requestLen", len(request)) + + var req message.CrossChainRequest + if _, err := n.crossChainCodec.Unmarshal(request, &req); err != nil { + log.Debug("failed to unmarshal CrossChainAppRequest", "requestingChainID", requestingChainID, "requestID", requestID, "requestLen", len(request), "err", err) + return nil + } + + bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.crossChainStats) + if err != nil { + log.Debug("deadline to process CrossChainAppRequest has expired, skipping", "requestingChainID", requestingChainID, "requestID", requestID, "err", err) + return nil + } + + log.Debug("processing incoming CrossChainAppRequest", "requestingChainID", requestingChainID, "requestID", requestID, "req", req) + handleCtx, cancel := context.WithDeadline(context.Background(), bufferedDeadline) + defer cancel() + + responseBytes, err := req.Handle(handleCtx, requestingChainID, requestID, n.crossChainRequestHandler) + switch { + case err != nil && err != context.DeadlineExceeded: + return err // Return a fatal error + case responseBytes != nil: + return n.appSender.SendCrossChainAppResponse(ctx, requestingChainID, requestID, responseBytes) // Propagate fatal error + default: + return nil + } } -func (n *network) CrossChainAppResponse(_ context.Context, respondingChainID ids.ID, requestID uint32, response []byte) error { - return nil +// CrossChainAppRequestFailed can be called by the avalanchego -> VM in following cases: +// - respondingChain doesn't exist +// - invalid CrossChainAppResponse from respondingChain +// - invalid CrossChainRequest was sent to respondingChain +// - request times out before a response is provided +// If [requestID] is not known, this function will emit a log and return a nil error. +// If the response handler returns an error it is propagated as a fatal error. +func (n *network) CrossChainAppRequestFailed(ctx context.Context, respondingChainID ids.ID, requestID uint32) error { + n.lock.Lock() + defer n.lock.Unlock() + + log.Debug("received CrossChainAppRequestFailed from chain", "respondingChainID", respondingChainID, "requestID", requestID) + + handler, exists := n.markRequestFulfilled(requestID) + if !exists { + // Should never happen since the engine should be managing outstanding requests + log.Error("received CrossChainAppRequestFailed to unknown request", "respondingChainID", respondingChainID, "requestID", requestID) + return nil + } + + // We must release the slot + n.activeCrossChainRequests.Release(1) + + return handler.OnFailure() +} + +// CrossChainAppResponse is invoked when there is a +// response received from [respondingChainID] regarding a request the VM sent out +// If [requestID] is not known, this function will emit a log and return a nil error. +// If the response handler returns an error it is propagated as a fatal error. +func (n *network) CrossChainAppResponse(ctx context.Context, respondingChainID ids.ID, requestID uint32, response []byte) error { + n.lock.Lock() + defer n.lock.Unlock() + + log.Debug("received CrossChainAppResponse from responding chain", "respondingChainID", respondingChainID, "requestID", requestID) + + handler, exists := n.markRequestFulfilled(requestID) + if !exists { + // Should never happen since the engine should be managing outstanding requests + log.Error("received CrossChainAppResponse to unknown request", "respondingChainID", respondingChainID, "requestID", requestID, "responseLen", len(response)) + return nil + } + + // We must release the slot + n.activeCrossChainRequests.Release(1) + + return handler.OnResponse(response) } // AppRequest is called by avalanchego -> VM when there is an incoming AppRequest from a peer @@ -189,9 +302,6 @@ func (n *network) CrossChainAppResponse(_ context.Context, respondingChainID ids // sends a response back to the sender if length of response returned by the handler is >0 // expects the deadline to not have been passed func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) error { - n.lock.RLock() - defer n.lock.RUnlock() - log.Debug("received AppRequest from node", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request)) var req message.Request @@ -200,20 +310,9 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u return nil } - // calculate how much time is left until the deadline - timeTillDeadline := time.Until(deadline) - n.stats.UpdateTimeUntilDeadline(timeTillDeadline) - - // bufferedDeadline is half the time till actual deadline so that the message has a reasonable chance - // of completing its processing and sending the response to the peer. - timeTillDeadline = time.Duration(timeTillDeadline.Nanoseconds() / 2) - bufferedDeadline := time.Now().Add(timeTillDeadline) - - // check if we have enough time to handle this request - if time.Until(bufferedDeadline) < minRequestHandlingDuration { - // Drop the request if we already missed the deadline to respond. - log.Debug("deadline to process AppRequest has expired, skipping", "nodeID", nodeID, "requestID", requestID, "req", req) - n.stats.IncDeadlineDroppedRequest() + bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats) + if err != nil { + log.Debug("deadline to process AppRequest has expired, skipping", "nodeID", nodeID, "requestID", requestID, "err", err) return nil } @@ -223,7 +322,7 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u handleCtx, cancel := context.WithDeadline(context.Background(), bufferedDeadline) defer cancel() - responseBytes, err := req.Handle(handleCtx, nodeID, requestID, n.requestHandler) + responseBytes, err := req.Handle(handleCtx, nodeID, requestID, n.appRequestHandler) switch { case err != nil && err != context.DeadlineExceeded: return err // Return a fatal error @@ -244,48 +343,77 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui log.Debug("received AppResponse from peer", "nodeID", nodeID, "requestID", requestID) - handler, exists := n.getRequestHandler(requestID) + handler, exists := n.markRequestFulfilled(requestID) if !exists { // Should never happen since the engine should be managing outstanding requests - log.Error("received response to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response)) + log.Error("received AppResponse to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response)) return nil } - return handler.OnResponse(nodeID, requestID, response) + // We must release the slot + n.activeAppRequests.Release(1) + + return handler.OnResponse(response) } // AppRequestFailed can be called by the avalanchego -> VM in following cases: // - node is benched // - failed to send message to [nodeID] due to a network issue -// - timeout +// - request times out before a response is provided // error returned by this function is expected to be treated as fatal by the engine // returns error only when the response handler returns an error func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error { n.lock.Lock() defer n.lock.Unlock() + log.Debug("received AppRequestFailed from peer", "nodeID", nodeID, "requestID", requestID) - handler, exists := n.getRequestHandler(requestID) + handler, exists := n.markRequestFulfilled(requestID) if !exists { // Should never happen since the engine should be managing outstanding requests - log.Error("received request failed to unknown request", "nodeID", nodeID, "requestID", requestID) + log.Error("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID) return nil } - return handler.OnFailure(nodeID, requestID) + // We must release the slot + n.activeAppRequests.Release(1) + + return handler.OnFailure() +} + +// calculateTimeUntilDeadline calculates the time until deadline and drops it if we missed he deadline to response. +// This function updates metrics for both app requests and cross chain requests. +// This is called by either [AppRequest] or [CrossChainAppRequest]. +func calculateTimeUntilDeadline(deadline time.Time, stats stats.RequestHandlerStats) (time.Time, error) { + // calculate how much time is left until the deadline + timeTillDeadline := time.Until(deadline) + stats.UpdateTimeUntilDeadline(timeTillDeadline) + + // bufferedDeadline is half the time till actual deadline so that the message has a reasonable chance + // of completing its processing and sending the response to the peer. + bufferedDeadline := time.Now().Add(timeTillDeadline / 2) + + // check if we have enough time to handle this request + if time.Until(bufferedDeadline) < minRequestHandlingDuration { + // Drop the request if we already missed the deadline to respond. + stats.IncDeadlineDroppedRequest() + return time.Time{}, errExpiredRequest + } + + return bufferedDeadline, nil } -// getRequestHandler fetches the handler for [requestID] and marks the request with [requestID] as having been fulfilled. +// markRequestFulfilled fetches the handler for [requestID] and marks the request with [requestID] as having been fulfilled. // This is called by either [AppResponse] or [AppRequestFailed]. -// assumes that the write lock is held. -func (n *network) getRequestHandler(requestID uint32) (message.ResponseHandler, bool) { +// Assumes that the write lock is held. +func (n *network) markRequestFulfilled(requestID uint32) (message.ResponseHandler, bool) { handler, exists := n.outstandingRequestHandlers[requestID] if !exists { return nil, false } - // mark message as processed, release activeRequests slot + // mark message as processed delete(n.outstandingRequestHandlers, requestID) - n.activeRequests.Release(1) + return handler, true } @@ -354,7 +482,14 @@ func (n *network) SetRequestHandler(handler message.RequestHandler) { n.lock.Lock() defer n.lock.Unlock() - n.requestHandler = handler + n.appRequestHandler = handler +} + +func (n *network) SetCrossChainRequestHandler(handler message.CrossChainRequestHandler) { + n.lock.Lock() + defer n.lock.Unlock() + + n.crossChainRequestHandler = handler } func (n *network) Size() uint32 { diff --git a/peer/network_test.go b/peer/network_test.go index c7ba5187f9..e589a48508 100644 --- a/peer/network_test.go +++ b/peer/network_test.go @@ -22,6 +22,8 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/version" "github.com/stretchr/testify/assert" + + ethcommon "github.com/ethereum/go-ethereum/common" ) var ( @@ -43,11 +45,14 @@ var ( _ common.AppSender = testAppSender{} _ message.GossipMessage = HelloGossip{} _ message.GossipHandler = &testGossipHandler{} + + _ message.CrossChainRequest = &ExampleCrossChainRequest{} + _ message.CrossChainRequestHandler = &testCrossChainHandler{} ) func TestNetworkDoesNotConnectToItself(t *testing.T) { selfNodeID := ids.GenerateTestNodeID() - n := NewNetwork(nil, nil, selfNodeID, 1) + n := NewNetwork(nil, nil, nil, selfNodeID, 1, 1) assert.NoError(t, n.Connected(context.Background(), selfNodeID, defaultPeerVersion)) assert.EqualValues(t, 0, n.Size()) } @@ -82,7 +87,8 @@ func TestRequestAnyRequestsRoutingAndResponse(t *testing.T) { } codecManager := buildCodec(t, HelloRequest{}, HelloResponse{}) - net = NewNetwork(sender, codecManager, ids.EmptyNodeID, 16) + crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) + net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager}) client := NewNetworkClient(net) nodeID := ids.GenerateTestNodeID() @@ -104,7 +110,7 @@ func TestRequestAnyRequestsRoutingAndResponse(t *testing.T) { defer wg.Done() requestBytes, err := message.RequestToBytes(codecManager, requestMessage) assert.NoError(t, err) - responseBytes, _, err := client.RequestAny(defaultPeerVersion, requestBytes) + responseBytes, _, err := client.SendAppRequestAny(defaultPeerVersion, requestBytes) assert.NoError(t, err) assert.NotNil(t, responseBytes) @@ -156,7 +162,8 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) { } codecManager := buildCodec(t, HelloRequest{}, HelloResponse{}) - net = NewNetwork(sender, codecManager, ids.EmptyNodeID, 16) + crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) + net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager}) client := NewNetworkClient(net) @@ -188,7 +195,7 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) { defer wg.Done() requestBytes, err := message.RequestToBytes(codecManager, requestMessage) assert.NoError(t, err) - responseBytes, err := client.Request(nodeID, requestBytes) + responseBytes, err := client.SendAppRequest(nodeID, requestBytes) assert.NoError(t, err) assert.NotNil(t, responseBytes) @@ -210,7 +217,7 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) { } // ensure empty nodeID is not allowed - _, err := client.Request(ids.EmptyNodeID, []byte("hello there")) + _, err := client.SendAppRequest(ids.EmptyNodeID, []byte("hello there")) assert.Error(t, err) assert.Contains(t, err.Error(), "cannot send request to empty nodeID") } @@ -242,7 +249,8 @@ func TestRequestMinVersion(t *testing.T) { } // passing nil as codec works because the net.AppRequest is never called - net = NewNetwork(sender, codecManager, ids.EmptyNodeID, 1) + crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) + net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16) client := NewNetworkClient(net) requestMessage := TestMessage{Message: "this is a request"} requestBytes, err := message.RequestToBytes(codecManager, requestMessage) @@ -260,7 +268,7 @@ func TestRequestMinVersion(t *testing.T) { ) // ensure version does not match - responseBytes, _, err := client.RequestAny( + responseBytes, _, err := client.SendAppRequestAny( &version.Application{ Major: 2, Minor: 0, @@ -272,7 +280,7 @@ func TestRequestMinVersion(t *testing.T) { assert.Nil(t, responseBytes) // ensure version matches and the request goes through - responseBytes, _, err = client.RequestAny(defaultPeerVersion, requestBytes) + responseBytes, _, err = client.SendAppRequestAny(defaultPeerVersion, requestBytes) assert.NoError(t, err) var response TestMessage @@ -296,6 +304,7 @@ func TestOnRequestHonoursDeadline(t *testing.T) { } codecManager := buildCodec(t, TestMessage{}) + crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) requestBytes, err := marshalStruct(codecManager, TestMessage{Message: "hello there"}) assert.NoError(t, err) @@ -303,7 +312,8 @@ func TestOnRequestHonoursDeadline(t *testing.T) { requestHandler := &testRequestHandler{ processingDuration: 500 * time.Millisecond, } - net = NewNetwork(sender, codecManager, ids.EmptyNodeID, 1) + + net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetRequestHandler(requestHandler) nodeID := ids.GenerateTestNodeID() @@ -323,6 +333,7 @@ func TestOnRequestHonoursDeadline(t *testing.T) { func TestGossip(t *testing.T) { codecManager := buildCodec(t, HelloGossip{}) + crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) nodeID := ids.GenerateTestNodeID() var clientNetwork Network @@ -342,7 +353,7 @@ func TestGossip(t *testing.T) { } gossipHandler := &testGossipHandler{} - clientNetwork = NewNetwork(sender, codecManager, ids.EmptyNodeID, 1) + clientNetwork = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(gossipHandler) assert.NoError(t, clientNetwork.Connected(context.Background(), nodeID, defaultPeerVersion)) @@ -363,12 +374,13 @@ func TestGossip(t *testing.T) { func TestHandleInvalidMessages(t *testing.T) { codecManager := buildCodec(t, HelloGossip{}, TestMessage{}) + crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) nodeID := ids.GenerateTestNodeID() requestID := uint32(1) sender := testAppSender{} - clientNetwork := NewNetwork(sender, codecManager, ids.EmptyNodeID, 1) + clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{}) clientNetwork.SetRequestHandler(&testRequestHandler{}) @@ -412,12 +424,13 @@ func TestHandleInvalidMessages(t *testing.T) { func TestNetworkPropagatesRequestHandlerError(t *testing.T) { codecManager := buildCodec(t, TestMessage{}) + crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) nodeID := ids.GenerateTestNodeID() requestID := uint32(1) sender := testAppSender{} - clientNetwork := NewNetwork(sender, codecManager, ids.EmptyNodeID, 1) + clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{}) clientNetwork.SetRequestHandler(&testRequestHandler{err: errors.New("fail")}) // Return an error from the request handler @@ -433,6 +446,125 @@ func TestNetworkPropagatesRequestHandlerError(t *testing.T) { assert.Error(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), requestMessage)) } +func TestCrossChainAppRequest(t *testing.T) { + var net Network + codecManager := buildCodec(t, TestMessage{}) + crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) + + sender := testAppSender{ + sendCrossChainAppRequestFn: func(requestingChainID ids.ID, requestID uint32, requestBytes []byte) error { + go func() { + if err := net.CrossChainAppRequest(context.Background(), requestingChainID, requestID, time.Now().Add(5*time.Second), requestBytes); err != nil { + panic(err) + } + }() + return nil + }, + sendCrossChainAppResponseFn: func(respondingChainID ids.ID, requestID uint32, responseBytes []byte) error { + go func() { + if err := net.CrossChainAppResponse(context.Background(), respondingChainID, requestID, responseBytes); err != nil { + panic(err) + } + }() + return nil + }, + } + + net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager}) + client := NewNetworkClient(net) + + exampleCrossChainRequest := ExampleCrossChainRequest{ + Message: "hello this is an example request", + } + + crossChainRequest, err := buildCrossChainRequest(crossChainCodecManager, exampleCrossChainRequest) + assert.NoError(t, err) + + chainID := ids.ID(ethcommon.BytesToHash([]byte{1, 2, 3, 4, 5})) + responseBytes, err := client.SendCrossChainRequest(chainID, crossChainRequest) + assert.NoError(t, err) + + var response ExampleCrossChainResponse + if _, err = crossChainCodecManager.Unmarshal(responseBytes, &response); err != nil { + t.Fatal("unexpected error during unmarshal", err) + } + assert.Equal(t, "this is an example response", response.Response) +} + +func TestCrossChainRequestRequestsRoutingAndResponse(t *testing.T) { + var ( + callNum uint32 + senderWg sync.WaitGroup + net Network + ) + + sender := testAppSender{ + sendCrossChainAppRequestFn: func(requestingChainID ids.ID, requestID uint32, requestBytes []byte) error { + senderWg.Add(1) + go func() { + defer senderWg.Done() + if err := net.CrossChainAppRequest(context.Background(), requestingChainID, requestID, time.Now().Add(5*time.Second), requestBytes); err != nil { + panic(err) + } + }() + return nil + }, + sendCrossChainAppResponseFn: func(respondingChainID ids.ID, requestID uint32, responseBytes []byte) error { + senderWg.Add(1) + go func() { + defer senderWg.Done() + if err := net.CrossChainAppResponse(context.Background(), respondingChainID, requestID, responseBytes); err != nil { + panic(err) + } + atomic.AddUint32(&callNum, 1) + }() + return nil + }, + } + + codecManager := buildCodec(t, TestMessage{}) + crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) + net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager}) + client := NewNetworkClient(net) + + exampleCrossChainRequest := ExampleCrossChainRequest{ + Message: "hello this is an example request", + } + + chainID := ids.ID(ethcommon.BytesToHash([]byte{1, 2, 3, 4, 5})) + defer net.Shutdown() + + totalRequests := 500 + numCallsPerRequest := 1 // on sending response + totalCalls := totalRequests * numCallsPerRequest + + var requestWg sync.WaitGroup + requestWg.Add(totalCalls) + + for i := 0; i < totalCalls; i++ { + go func() { + defer requestWg.Done() + crossChainRequest, err := buildCrossChainRequest(crossChainCodecManager, exampleCrossChainRequest) + assert.NoError(t, err) + responseBytes, err := client.SendCrossChainRequest(chainID, crossChainRequest) + assert.NoError(t, err) + assert.NotNil(t, responseBytes) + + var response ExampleCrossChainResponse + if _, err = crossChainCodecManager.Unmarshal(responseBytes, &response); err != nil { + panic(fmt.Errorf("unexpected error during unmarshal: %w", err)) + } + assert.Equal(t, "this is an example response", response.Response) + }() + } + + requestWg.Wait() + senderWg.Wait() + assert.Equal(t, totalCalls, int(atomic.LoadUint32(&callNum))) +} + func buildCodec(t *testing.T, types ...interface{}) codec.Manager { codecManager := codec.NewDefaultManager() c := linearcodec.NewDefault() @@ -453,6 +585,10 @@ func buildGossip(codec codec.Manager, msg message.GossipMessage) ([]byte, error) return codec.Marshal(message.Version, &msg) } +func buildCrossChainRequest(codec codec.Manager, msg message.CrossChainRequest) ([]byte, error) { + return codec.Marshal(message.Version, &msg) +} + type testAppSender struct { sendCrossChainAppRequestFn func(ids.ID, uint32, []byte) error sendCrossChainAppResponseFn func(ids.ID, uint32, []byte) error @@ -600,3 +736,32 @@ func (r *testRequestHandler) handleTestRequest(ctx context.Context, _ ids.NodeID } return r.response, r.err } + +type ExampleCrossChainRequest struct { + Message string `serialize:"true"` +} + +func (e ExampleCrossChainRequest) Handle(ctx context.Context, requestingChainID ids.ID, requestID uint32, handler message.CrossChainRequestHandler) ([]byte, error) { + return handler.(*testCrossChainHandler).HandleCrossChainRequest(ctx, requestingChainID, requestID, e) +} + +func (e ExampleCrossChainRequest) String() string { + return fmt.Sprintf("TestMessage(%s)", e.Message) +} + +type ExampleCrossChainResponse struct { + Response string `serialize:"true"` +} + +type TestCrossChainRequestHandler interface { + HandleCrossChainRequest(ctx context.Context, requestingchainID ids.ID, requestID uint32, exampleRequest message.CrossChainRequest) ([]byte, error) +} + +type testCrossChainHandler struct { + message.CrossChainRequestHandler + codec codec.Manager +} + +func (t *testCrossChainHandler) HandleCrossChainRequest(ctx context.Context, requestingChainID ids.ID, requestID uint32, exampleRequest message.CrossChainRequest) ([]byte, error) { + return t.codec.Marshal(message.Version, ExampleCrossChainResponse{Response: "this is an example response"}) +} diff --git a/peer/stats/stats.go b/peer/stats/stats.go index 2c1eea6713..e29a26e614 100644 --- a/peer/stats/stats.go +++ b/peer/stats/stats.go @@ -9,8 +9,7 @@ import ( "github.com/ava-labs/subnet-evm/metrics" ) -// RequestHandlerStats provides the interface for metrics on request handling. -// Since we drop +// RequestHandlerStats provides the interface for metrics for both app requests and cross chain requests. type RequestHandlerStats interface { UpdateTimeUntilDeadline(duration time.Duration) IncDeadlineDroppedRequest() @@ -35,3 +34,10 @@ func NewRequestHandlerStats() RequestHandlerStats { droppedRequests: metrics.GetOrRegisterCounter("net_req_deadline_dropped", nil), } } + +func NewCrossChainRequestHandlerStats() RequestHandlerStats { + return &requestHandlerStats{ + timeUntilDeadline: metrics.GetOrRegisterTimer("net_cross_chain_req_time_until_deadline", nil), + droppedRequests: metrics.GetOrRegisterCounter("net_cross_chain_req_deadline_dropped", nil), + } +} diff --git a/peer/waiting_handler.go b/peer/waiting_handler.go index 8162817f56..846166c121 100644 --- a/peer/waiting_handler.go +++ b/peer/waiting_handler.go @@ -4,7 +4,6 @@ package peer import ( - "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/subnet-evm/plugin/evm/message" ) @@ -20,14 +19,14 @@ type waitingResponseHandler struct { } // OnResponse passes the response bytes to the responseChan and closes the channel -func (w *waitingResponseHandler) OnResponse(_ ids.NodeID, _ uint32, response []byte) error { +func (w *waitingResponseHandler) OnResponse(response []byte) error { w.responseChan <- response close(w.responseChan) return nil } // OnFailure sets the failed flag to true and closes the channel -func (w *waitingResponseHandler) OnFailure(ids.NodeID, uint32) error { +func (w *waitingResponseHandler) OnFailure() error { w.failed = true close(w.responseChan) return nil diff --git a/plugin/evm/config.go b/plugin/evm/config.go index 482d81d7d5..fcb0fa297c 100644 --- a/plugin/evm/config.go +++ b/plugin/evm/config.go @@ -15,37 +15,38 @@ import ( ) const ( - defaultAcceptorQueueLimit = 64 // Provides 2 minutes of buffer (2s block target) for a commit delay - defaultPruningEnabled = true - defaultCommitInterval = 4096 - defaultTrieCleanCache = 512 - defaultTrieDirtyCache = 256 - defaultTrieDirtyCommitTarget = 20 - defaultSnapshotCache = 256 - defaultSyncableCommitInterval = defaultCommitInterval * 4 - defaultSnapshotAsync = true - defaultRpcGasCap = 50_000_000 // Default to 50M Gas Limit - defaultRpcTxFeeCap = 100 // 100 AVAX - defaultMetricsExpensiveEnabled = true - defaultApiMaxDuration = 0 // Default to no maximum API call duration - defaultWsCpuRefillRate = 0 // Default to no maximum WS CPU usage - defaultWsCpuMaxStored = 0 // Default to no maximum WS CPU usage - defaultMaxBlocksPerRequest = 0 // Default to no maximum on the number of blocks per getLogs request - defaultContinuousProfilerFrequency = 15 * time.Minute - defaultContinuousProfilerMaxFiles = 5 - defaultRegossipFrequency = 1 * time.Minute - defaultRegossipMaxTxs = 16 - defaultRegossipTxsPerAddress = 1 - defaultPriorityRegossipFrequency = 1 * time.Second - defaultPriorityRegossipMaxTxs = 32 - defaultPriorityRegossipTxsPerAddress = 16 - defaultOfflinePruningBloomFilterSize uint64 = 512 // Default size (MB) for the offline pruner to use - defaultLogLevel = "info" - defaultLogJSONFormat = false - defaultMaxOutboundActiveRequests = 16 - defaultPopulateMissingTriesParallelism = 1024 - defaultStateSyncServerTrieCache = 64 // MB - defaultAcceptedCacheSize = 32 // blocks + defaultAcceptorQueueLimit = 64 // Provides 2 minutes of buffer (2s block target) for a commit delay + defaultPruningEnabled = true + defaultCommitInterval = 4096 + defaultTrieCleanCache = 512 + defaultTrieDirtyCache = 256 + defaultTrieDirtyCommitTarget = 20 + defaultSnapshotCache = 256 + defaultSyncableCommitInterval = defaultCommitInterval * 4 + defaultSnapshotAsync = true + defaultRpcGasCap = 50_000_000 // Default to 50M Gas Limit + defaultRpcTxFeeCap = 100 // 100 AVAX + defaultMetricsExpensiveEnabled = true + defaultApiMaxDuration = 0 // Default to no maximum API call duration + defaultWsCpuRefillRate = 0 // Default to no maximum WS CPU usage + defaultWsCpuMaxStored = 0 // Default to no maximum WS CPU usage + defaultMaxBlocksPerRequest = 0 // Default to no maximum on the number of blocks per getLogs request + defaultContinuousProfilerFrequency = 15 * time.Minute + defaultContinuousProfilerMaxFiles = 5 + defaultRegossipFrequency = 1 * time.Minute + defaultRegossipMaxTxs = 16 + defaultRegossipTxsPerAddress = 1 + defaultPriorityRegossipFrequency = 1 * time.Second + defaultPriorityRegossipMaxTxs = 32 + defaultPriorityRegossipTxsPerAddress = 16 + defaultOfflinePruningBloomFilterSize uint64 = 512 // Default size (MB) for the offline pruner to use + defaultLogLevel = "info" + defaultLogJSONFormat = false + defaultMaxOutboundActiveRequests = 16 + defaultMaxOutboundActiveCrossChainRequests = 64 + defaultPopulateMissingTriesParallelism = 1024 + defaultStateSyncServerTrieCache = 64 // MB + defaultAcceptedCacheSize = 32 // blocks // defaultStateSyncMinBlocks is the minimum number of blocks the blockchain // should be ahead of local last accepted to perform state sync. @@ -171,7 +172,11 @@ type Config struct { OfflinePruningDataDirectory string `json:"offline-pruning-data-directory"` // VM2VM network - MaxOutboundActiveRequests int64 `json:"max-outbound-active-requests"` + MaxOutboundActiveRequests int64 `json:"max-outbound-active-requests"` + MaxOutboundActiveCrossChainRequests int64 `json:"max-outbound-active-cross-chain-requests"` + + // Database Settings + InspectDatabase bool `json:"inspect-database"` // Inspects the database on startup if enabled. // Sync settings StateSyncEnabled bool `json:"state-sync-enabled"` @@ -196,6 +201,12 @@ type Config struct { // This is particularly useful for improving the performance of eth_getLogs // on RPC nodes. AcceptedCacheSize int `json:"accepted-cache-size"` + + // TxLookupLimit is the maximum number of blocks from head whose tx indices + // are reserved: + // * 0: means no limit + // * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes + TxLookupLimit uint64 `json:"tx-lookup-limit"` } // EthAPIs returns an array of strings representing the Eth APIs that should be enabled @@ -246,6 +257,7 @@ func (c *Config) SetDefaults() { c.LogLevel = defaultLogLevel c.LogJSONFormat = defaultLogJSONFormat c.MaxOutboundActiveRequests = defaultMaxOutboundActiveRequests + c.MaxOutboundActiveCrossChainRequests = defaultMaxOutboundActiveCrossChainRequests c.PopulateMissingTriesParallelism = defaultPopulateMissingTriesParallelism c.StateSyncServerTrieCache = defaultStateSyncServerTrieCache c.StateSyncCommitInterval = defaultSyncableCommitInterval diff --git a/plugin/evm/config_test.go b/plugin/evm/config_test.go index fd92065741..53a7777423 100644 --- a/plugin/evm/config_test.go +++ b/plugin/evm/config_test.go @@ -72,6 +72,34 @@ func TestUnmarshalConfig(t *testing.T) { Config{StateSyncIDs: "NodeID-CaBYJ9kzHvrQFiYWowMkJGAQKGMJqZoat"}, false, }, + { + "empty tx lookup limit", + []byte(`{}`), + Config{TxLookupLimit: 0}, + false, + }, + { + "zero tx lookup limit", + []byte(`{"tx-lookup-limit": 0}`), + func() Config { + return Config{TxLookupLimit: 0} + }(), + false, + }, + { + "1 tx lookup limit", + []byte(`{"tx-lookup-limit": 1}`), + func() Config { + return Config{TxLookupLimit: 1} + }(), + false, + }, + { + "-1 tx lookup limit", + []byte(`{"tx-lookup-limit": -1}`), + Config{}, + true, + }, { "allow unprotected tx hashes", []byte(`{"allow-unprotected-tx-hashes": ["0x803351deb6d745e91545a6a3e1c0ea3e9a6a02a1a4193b70edfcd2f40f71a01c"]}`), diff --git a/plugin/evm/message/codec.go b/plugin/evm/message/codec.go index 8b0fabd550..dcb1a9e5e9 100644 --- a/plugin/evm/message/codec.go +++ b/plugin/evm/message/codec.go @@ -15,7 +15,10 @@ const ( maxMessageSize = 1 * units.MiB ) -var Codec codec.Manager +var ( + Codec codec.Manager + CrossChainCodec codec.Manager +) func init() { Codec = codec.NewManager(maxMessageSize) @@ -43,4 +46,20 @@ func init() { if errs.Errored() { panic(errs.Err) } + + CrossChainCodec = codec.NewManager(maxMessageSize) + ccc := linearcodec.NewDefault() + + errs = wrappers.Errs{} + errs.Add( + // CrossChainRequest Types + ccc.RegisterType(EthCallRequest{}), + ccc.RegisterType(EthCallResponse{}), + + CrossChainCodec.RegisterCodec(Version, ccc), + ) + + if errs.Errored() { + panic(errs.Err) + } } diff --git a/plugin/evm/message/cross_chain_handler.go b/plugin/evm/message/cross_chain_handler.go new file mode 100644 index 0000000000..f356684ee3 --- /dev/null +++ b/plugin/evm/message/cross_chain_handler.go @@ -0,0 +1,73 @@ +// (c) 2021-2022, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package message + +import ( + "context" + "encoding/json" + + "github.com/ava-labs/avalanchego/codec" + "github.com/ava-labs/avalanchego/ids" + + "github.com/ava-labs/subnet-evm/internal/ethapi" + "github.com/ava-labs/subnet-evm/rpc" + + "github.com/ethereum/go-ethereum/log" +) + +var _ CrossChainRequestHandler = &crossChainHandler{} + +// crossChainHandler implements the CrossChainRequestHandler interface +type crossChainHandler struct { + backend ethapi.Backend + crossChainCodec codec.Manager +} + +// NewCrossChainHandler creates and returns a new instance of CrossChainRequestHandler +func NewCrossChainHandler(b ethapi.Backend, codec codec.Manager) CrossChainRequestHandler { + return &crossChainHandler{ + backend: b, + crossChainCodec: codec, + } +} + +// HandleEthCallRequests returns an encoded EthCallResponse to the given [ethCallRequest] +// This function executes EVM Call against the state associated with [rpc.AcceptedBlockNumber] with the given +// transaction call object [ethCallRequest]. +// This function does not return an error as errors are treated as FATAL to the node. +func (c *crossChainHandler) HandleEthCallRequest(ctx context.Context, requestingChainID ids.ID, requestID uint32, ethCallRequest EthCallRequest) ([]byte, error) { + lastAcceptedBlockNumber := rpc.BlockNumber(c.backend.LastAcceptedBlock().NumberU64()) + lastAcceptedBlockNumberOrHash := rpc.BlockNumberOrHash{BlockNumber: &lastAcceptedBlockNumber} + + transactionArgs := ethapi.TransactionArgs{} + err := json.Unmarshal(ethCallRequest.RequestArgs, &transactionArgs) + if err != nil { + log.Debug("error occurred with JSON unmarshalling ethCallRequest.RequestArgs", "err", err) + return nil, nil + } + + result, err := ethapi.DoCall(ctx, c.backend, transactionArgs, lastAcceptedBlockNumberOrHash, nil, c.backend.RPCEVMTimeout(), c.backend.RPCGasCap()) + if err != nil { + log.Debug("error occurred with EthCall", "err", err, "transactionArgs", ethCallRequest.RequestArgs, "blockNumberOrHash", lastAcceptedBlockNumberOrHash) + return nil, nil + } + + executionResult, err := json.Marshal(&result) + if err != nil { + log.Debug("error occurred with JSON marshalling result", "err", err) + return nil, nil + } + + response := EthCallResponse{ + ExecutionResult: executionResult, + } + + responseBytes, err := c.crossChainCodec.Marshal(Version, response) + if err != nil { + log.Warn("error occurred with marshalling EthCallResponse", "err", err, "EthCallResponse", response) + return nil, nil + } + + return responseBytes, nil +} diff --git a/plugin/evm/message/eth_call_request.go b/plugin/evm/message/eth_call_request.go new file mode 100644 index 0000000000..69d1139a2b --- /dev/null +++ b/plugin/evm/message/eth_call_request.go @@ -0,0 +1,33 @@ +// (c) 2021-2022, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package message + +import ( + "context" + "fmt" + + "github.com/ava-labs/avalanchego/ids" +) + +var _ CrossChainRequest = EthCallRequest{} + +// EthCallRequest has the JSON Data necessary to execute a new EVM call on the blockchain +type EthCallRequest struct { + RequestArgs []byte `serialize:"true"` +} + +// EthCallResponse represents the JSON return value of the executed EVM call +type EthCallResponse struct { + ExecutionResult []byte `serialize:"true"` +} + +// String converts EthCallRequest to a string +func (e EthCallRequest) String() string { + return fmt.Sprintf("%#v", e) +} + +// Handle returns the encoded EthCallResponse by executing EVM call with the given EthCallRequest +func (e EthCallRequest) Handle(ctx context.Context, requestingChainID ids.ID, requestID uint32, handler CrossChainRequestHandler) ([]byte, error) { + return handler.HandleEthCallRequest(ctx, requestingChainID, requestID, e) +} diff --git a/plugin/evm/message/handler.go b/plugin/evm/message/handler.go index 4b7bf7c4d7..042e432b65 100644 --- a/plugin/evm/message/handler.go +++ b/plugin/evm/message/handler.go @@ -12,8 +12,9 @@ import ( ) var ( - _ GossipHandler = NoopMempoolGossipHandler{} - _ RequestHandler = NoopRequestHandler{} + _ GossipHandler = NoopMempoolGossipHandler{} + _ RequestHandler = NoopRequestHandler{} + _ CrossChainRequestHandler = NoopCrossChainRequestHandler{} ) // GossipHandler handles incoming gossip messages @@ -43,9 +44,9 @@ type RequestHandler interface { // Only one of OnResponse or OnFailure is called for a given requestID, not both type ResponseHandler interface { // OnResponse is invoked when the peer responded to a request - OnResponse(nodeID ids.NodeID, requestID uint32, response []byte) error + OnResponse(response []byte) error // OnFailure is invoked when there was a failure in processing a request - OnFailure(nodeID ids.NodeID, requestID uint32) error + OnFailure() error } type NoopRequestHandler struct{} @@ -61,3 +62,14 @@ func (NoopRequestHandler) HandleBlockRequest(ctx context.Context, nodeID ids.Nod func (NoopRequestHandler) HandleCodeRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, codeRequest CodeRequest) ([]byte, error) { return nil, nil } + +// CrossChainRequestHandler interface handles incoming requests from another chain +type CrossChainRequestHandler interface { + HandleEthCallRequest(ctx context.Context, requestingchainID ids.ID, requestID uint32, ethCallRequest EthCallRequest) ([]byte, error) +} + +type NoopCrossChainRequestHandler struct{} + +func (NoopCrossChainRequestHandler) HandleEthCallRequest(ctx context.Context, requestingchainID ids.ID, requestID uint32, ethCallRequest EthCallRequest) ([]byte, error) { + return nil, nil +} diff --git a/plugin/evm/message/request.go b/plugin/evm/message/request.go index 2aadf5a902..6b5831f9df 100644 --- a/plugin/evm/message/request.go +++ b/plugin/evm/message/request.go @@ -34,3 +34,13 @@ func BytesToRequest(codec codec.Manager, requestBytes []byte) (Request, error) { func RequestToBytes(codec codec.Manager, request Request) ([]byte, error) { return codec.Marshal(Version, &request) } + +// CrossChainRequest represents the interface a cross chain request should implement +type CrossChainRequest interface { + // CrossChainRequest should implement String() for logging. + fmt.Stringer + + // Handle allows [CrossChainRequest] to call respective methods on handler to handle + // this particular request type + Handle(ctx context.Context, requestingChainID ids.ID, requestID uint32, handler CrossChainRequestHandler) ([]byte, error) +} diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index b4d1b4ab9d..cc08501e2b 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -276,6 +276,16 @@ func (vm *VM) Initialize( vm.db = versiondb.New(baseDB) vm.acceptedBlockDB = prefixdb.New(acceptedPrefix, vm.db) vm.metadataDB = prefixdb.New(metadataPrefix, vm.db) + + if vm.config.InspectDatabase { + start := time.Now() + log.Info("Starting database inspection") + if err := rawdb.InspectDatabase(vm.chaindb, nil, nil); err != nil { + return err + } + log.Info("Completed database inspection", "elapsed", time.Since(start)) + } + g := new(core.Genesis) if err := json.Unmarshal(genesisBytes, g); err != nil { return err @@ -348,6 +358,7 @@ func (vm *VM) Initialize( vm.ethConfig.CommitInterval = vm.config.CommitInterval vm.ethConfig.SkipUpgradeCheck = vm.config.SkipUpgradeCheck vm.ethConfig.AcceptedCacheSize = vm.config.AcceptedCacheSize + vm.ethConfig.TxLookupLimit = vm.config.TxLookupLimit // Create directory for offline pruning if len(vm.ethConfig.OfflinePruningDataDirectory) != 0 { @@ -403,7 +414,7 @@ func (vm *VM) Initialize( // initialize peer network vm.networkCodec = message.Codec - vm.Network = peer.NewNetwork(appSender, vm.networkCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests) + vm.Network = peer.NewNetwork(appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests) vm.client = peer.NewNetworkClient(vm.Network) if err := vm.initializeChain(lastAcceptedHash, vm.ethConfig); err != nil { @@ -524,6 +535,7 @@ func (vm *VM) initializeStateSyncServer() { }) vm.setAppRequestHandlers() + vm.setCrossChainAppRequestHandler() } func (vm *VM) initChainState(lastAcceptedBlock *types.Block) error { @@ -604,6 +616,13 @@ func (vm *VM) setAppRequestHandlers() { vm.Network.SetRequestHandler(syncRequestHandler) } +// setCrossChainAppRequestHandler sets the request handlers for the VM to serve cross chain +// requests. +func (vm *VM) setCrossChainAppRequestHandler() { + crossChainRequestHandler := message.NewCrossChainHandler(vm.eth.APIBackend, message.CrossChainCodec) + vm.Network.SetCrossChainRequestHandler(crossChainRequestHandler) +} + // Shutdown implements the snowman.ChainVM interface func (vm *VM) Shutdown(context.Context) error { if vm.ctx == nil { diff --git a/plugin/evm/vm_test.go b/plugin/evm/vm_test.go index 554ced4106..65f98e92db 100644 --- a/plugin/evm/vm_test.go +++ b/plugin/evm/vm_test.go @@ -18,11 +18,14 @@ import ( "time" "github.com/ava-labs/subnet-evm/commontype" + "github.com/ava-labs/subnet-evm/internal/ethapi" "github.com/ava-labs/subnet-evm/metrics" + "github.com/ava-labs/subnet-evm/plugin/evm/message" "github.com/ava-labs/subnet-evm/precompile" "github.com/ava-labs/subnet-evm/trie" "github.com/ava-labs/subnet-evm/vmerrs" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" @@ -51,6 +54,7 @@ import ( "github.com/ava-labs/subnet-evm/params" "github.com/ava-labs/subnet-evm/rpc" + "github.com/ava-labs/subnet-evm/accounts/abi" accountKeystore "github.com/ava-labs/subnet-evm/accounts/keystore" ) @@ -2987,3 +2991,169 @@ func TestSkipChainConfigCheckCompatible(t *testing.T) { require.NoError(t, err) require.NoError(t, reinitVM.Shutdown(context.Background())) } + +func TestCrossChainMessagestoVM(t *testing.T) { + crossChainCodec := message.CrossChainCodec + require := require.New(t) + + // the following is based on this contract: + // contract T { + // event received(address sender, uint amount, bytes memo); + // event receivedAddr(address sender); + // + // function receive(bytes calldata memo) external payable returns (string memory res) { + // emit received(msg.sender, msg.value, memo); + // emit receivedAddr(msg.sender); + // return "hello world"; + // } + // } + + const abiBin = `0x608060405234801561001057600080fd5b506102a0806100206000396000f3fe60806040526004361061003b576000357c010000000000000000000000000000000000000000000000000000000090048063a69b6ed014610040575b600080fd5b6100b76004803603602081101561005657600080fd5b810190808035906020019064010000000081111561007357600080fd5b82018360208201111561008557600080fd5b803590602001918460018302840111640100000000831117156100a757600080fd5b9091929391929390505050610132565b6040518080602001828103825283818151815260200191508051906020019080838360005b838110156100f75780820151818401526020810190506100dc565b50505050905090810190601f1680156101245780820380516001836020036101000a031916815260200191505b509250505060405180910390f35b60607f75fd880d39c1daf53b6547ab6cb59451fc6452d27caa90e5b6649dd8293b9eed33348585604051808573ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001848152602001806020018281038252848482818152602001925080828437600081840152601f19601f8201169050808301925050509550505050505060405180910390a17f46923992397eac56cf13058aced2a1871933622717e27b24eabc13bf9dd329c833604051808273ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200191505060405180910390a16040805190810160405280600b81526020017f68656c6c6f20776f726c6400000000000000000000000000000000000000000081525090509291505056fea165627a7a72305820ff0c57dad254cfeda48c9cfb47f1353a558bccb4d1bc31da1dae69315772d29e0029` + const abiJSON = `[ { "constant": false, "inputs": [ { "name": "memo", "type": "bytes" } ], "name": "receive", "outputs": [ { "name": "res", "type": "string" } ], "payable": true, "stateMutability": "payable", "type": "function" }, { "anonymous": false, "inputs": [ { "indexed": false, "name": "sender", "type": "address" }, { "indexed": false, "name": "amount", "type": "uint256" }, { "indexed": false, "name": "memo", "type": "bytes" } ], "name": "received", "type": "event" }, { "anonymous": false, "inputs": [ { "indexed": false, "name": "sender", "type": "address" } ], "name": "receivedAddr", "type": "event" } ]` + parsed, err := abi.JSON(strings.NewReader(abiJSON)) + require.NoErrorf(err, "could not parse abi: %v") + + calledSendCrossChainAppResponseFn := false + issuer, vm, _, appSender := GenesisVM(t, true, genesisJSONSubnetEVM, "", "") + + defer func() { + err := vm.Shutdown(context.Background()) + require.NoError(err) + }() + + appSender.SendCrossChainAppResponseF = func(ctx context.Context, respondingChainID ids.ID, requestID uint32, responseBytes []byte) { + calledSendCrossChainAppResponseFn = true + + var response message.EthCallResponse + if _, err = crossChainCodec.Unmarshal(responseBytes, &response); err != nil { + require.NoErrorf(err, "unexpected error during unmarshal: %w") + } + + result := core.ExecutionResult{} + err = json.Unmarshal(response.ExecutionResult, &result) + require.NoError(err) + require.NotNil(result.ReturnData) + + finalResult, err := parsed.Unpack("receive", result.ReturnData) + require.NoError(err) + require.NotNil(finalResult) + require.Equal("hello world", finalResult[0]) + } + + newTxPoolHeadChan := make(chan core.NewTxPoolReorgEvent, 1) + vm.txPool.SubscribeNewReorgEvent(newTxPoolHeadChan) + + tx := types.NewTransaction(uint64(0), testEthAddrs[1], firstTxAmount, 21000, big.NewInt(testMinGasPrice), nil) + signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainConfig.ChainID), testKeys[0]) + require.NoError(err) + + txErrors := vm.txPool.AddRemotesSync([]*types.Transaction{signedTx}) + for _, err := range txErrors { + require.NoError(err) + } + + <-issuer + + blk1, err := vm.BuildBlock(context.Background()) + require.NoError(err) + + err = blk1.Verify(context.Background()) + require.NoError(err) + + if status := blk1.Status(); status != choices.Processing { + t.Fatalf("Expected status of built block to be %s, but found %s", choices.Processing, status) + } + + err = vm.SetPreference(context.Background(), blk1.ID()) + require.NoError(err) + + err = blk1.Accept(context.Background()) + require.NoError(err) + + newHead := <-newTxPoolHeadChan + if newHead.Head.Hash() != common.Hash(blk1.ID()) { + t.Fatalf("Expected new block to match") + } + + if status := blk1.Status(); status != choices.Accepted { + t.Fatalf("Expected status of accepted block to be %s, but found %s", choices.Accepted, status) + } + + lastAcceptedID, err := vm.LastAccepted(context.Background()) + require.NoError(err) + + if lastAcceptedID != blk1.ID() { + t.Fatalf("Expected last accepted blockID to be the accepted block: %s, but found %s", blk1.ID(), lastAcceptedID) + } + + contractTx := types.NewContractCreation(1, common.Big0, 200000, big.NewInt(testMinGasPrice), common.FromHex(abiBin)) + contractSignedTx, err := types.SignTx(contractTx, types.NewEIP155Signer(vm.chainConfig.ChainID), testKeys[0]) + require.NoError(err) + + errs := vm.txPool.AddRemotesSync([]*types.Transaction{contractSignedTx}) + for _, err := range errs { + require.NoError(err) + } + testAddr := testEthAddrs[0] + contractAddress := crypto.CreateAddress(testAddr, 1) + + <-issuer + + blk2, err := vm.BuildBlock(context.Background()) + require.NoError(err) + + err = blk2.Verify(context.Background()) + require.NoError(err) + + if status := blk2.Status(); status != choices.Processing { + t.Fatalf("Expected status of built block to be %s, but found %s", choices.Processing, status) + } + + err = vm.SetPreference(context.Background(), blk2.ID()) + require.NoError(err) + + err = blk2.Accept(context.Background()) + require.NoError(err) + + newHead = <-newTxPoolHeadChan + if newHead.Head.Hash() != common.Hash(blk2.ID()) { + t.Fatalf("Expected new block to match") + } + + if status := blk2.Status(); status != choices.Accepted { + t.Fatalf("Expected status of accepted block to be %s, but found %s", choices.Accepted, status) + } + + lastAcceptedID, err = vm.LastAccepted(context.Background()) + require.NoError(err) + + if lastAcceptedID != blk2.ID() { + t.Fatalf("Expected last accepted blockID to be the accepted block: %s, but found %s", blk2.ID(), lastAcceptedID) + } + + input, err := parsed.Pack("receive", []byte("X")) + require.NoError(err) + + data := hexutil.Bytes(input) + + requestArgs, err := json.Marshal(ðapi.TransactionArgs{ + To: &contractAddress, + Data: &data, + }) + require.NoError(err) + + var ethCallRequest message.CrossChainRequest = message.EthCallRequest{ + RequestArgs: requestArgs, + } + + crossChainRequest, err := crossChainCodec.Marshal(message.Version, ðCallRequest) + require.NoError(err) + + requestingChainID := ids.ID(common.BytesToHash([]byte{1, 2, 3, 4, 5})) + + // we need all items in the acceptor queue to be processed before we process a cross chain request + vm.blockChain.DrainAcceptorQueue() + err = vm.Network.CrossChainAppRequest(context.Background(), requestingChainID, 1, time.Now().Add(60*time.Second), crossChainRequest) + require.NoError(err) + require.True(calledSendCrossChainAppResponseFn, "sendCrossChainAppResponseFn was not called") +} diff --git a/scripts/versions.sh b/scripts/versions.sh index 075487783c..29f66762e6 100644 --- a/scripts/versions.sh +++ b/scripts/versions.sh @@ -8,4 +8,4 @@ network_runner_version=${NETWORK_RUNNER_VERSION:-'v1.3.5'} ginkgo_version=${GINKGO_VERSION:-'v2.2.0'} # This won't be used, but it's here to make code syncs easier -latest_coreth_version=0.11.3 +latest_coreth_version=0.11.6 diff --git a/sync/client/client.go b/sync/client/client.go index d4cde79bc4..773baf54c5 100644 --- a/sync/client/client.go +++ b/sync/client/client.go @@ -325,14 +325,14 @@ func (c *client) get(ctx context.Context, request message.Request, parseFn parse start time.Time = time.Now() ) if len(c.stateSyncNodes) == 0 { - response, nodeID, err = c.networkClient.RequestAny(StateSyncVersion, requestBytes) + response, nodeID, err = c.networkClient.SendAppRequestAny(StateSyncVersion, requestBytes) } else { // get the next nodeID using the nodeIdx offset. If we're out of nodes, loop back to 0 // we do this every attempt to ensure we get a different node each time if possible. nodeIdx := atomic.AddUint32(&c.stateSyncNodeIdx, 1) nodeID = c.stateSyncNodes[nodeIdx%uint32(len(c.stateSyncNodes))] - response, err = c.networkClient.Request(nodeID, requestBytes) + response, err = c.networkClient.SendAppRequest(nodeID, requestBytes) } metric.UpdateRequestLatency(time.Since(start)) diff --git a/sync/client/mock_network.go b/sync/client/mock_network.go index 16981e4471..b9729350fa 100644 --- a/sync/client/mock_network.go +++ b/sync/client/mock_network.go @@ -28,7 +28,7 @@ type mockNetwork struct { nodesRequested []ids.NodeID } -func (t *mockNetwork) RequestAny(minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) { +func (t *mockNetwork) SendAppRequestAny(minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) { if len(t.response) == 0 { return nil, ids.EmptyNodeID, errors.New("no mocked response to return in mockNetwork") } @@ -39,7 +39,7 @@ func (t *mockNetwork) RequestAny(minVersion *version.Application, request []byte return response, ids.EmptyNodeID, err } -func (t *mockNetwork) Request(nodeID ids.NodeID, request []byte) ([]byte, error) { +func (t *mockNetwork) SendAppRequest(nodeID ids.NodeID, request []byte) ([]byte, error) { if len(t.response) == 0 { return nil, errors.New("no mocked response to return in mockNetwork") } @@ -77,6 +77,10 @@ func (t *mockNetwork) Gossip([]byte) error { panic("not implemented") // we don't care about this function for this test } +func (t *mockNetwork) SendCrossChainRequest(chainID ids.ID, request []byte) ([]byte, error) { + panic("not implemented") // we don't care about this function for this test +} + func (t *mockNetwork) mockResponse(times uint8, callback func(), response []byte) { t.response = make([][]byte, times) for i := uint8(0); i < times; i++ {