Skip to content

Commit

Permalink
Coreth 0.11.6 sync (#436)
Browse files Browse the repository at this point in the history
* sync changes from coreth-0.11.6

* remove diff patch

* revert format changes

* nits (#441)
  • Loading branch information
ceyonur authored Jan 23, 2023
1 parent 0a2e17e commit dbc1b4d
Show file tree
Hide file tree
Showing 28 changed files with 1,650 additions and 170 deletions.
37 changes: 18 additions & 19 deletions .github/CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
# Contributing

Thank you for considering to help out with the source code! We welcome
contributions from anyone on the internet, and are grateful for even the
Thank you for considering to help out with the source code! We welcome
contributions from anyone on the internet, and are grateful for even the
smallest of fixes!

If you'd like to contribute to coreth, please fork, fix, commit and send a
If you'd like to contribute to subnet-evm, please fork, fix, commit and send a
pull request for the maintainers to review and merge into the main code base. If
you wish to submit more complex changes though, please check up with the core
devs first on [Discord](https://chat.avalabs.org) to
ensure those changes are in line with the general philosophy of the project
you wish to submit more complex changes though, please check up with the core
devs first on [Discord](https://chat.avalabs.org) to
ensure those changes are in line with the general philosophy of the project
and/or get some early feedback which can make both your efforts much lighter as
well as our review and merge procedures quick and simple.

## Coding guidelines

Please make sure your contributions adhere to our coding guidelines:

* Code must adhere to the official Go
[formatting](https://golang.org/doc/effective_go.html#formatting) guidelines
(i.e. uses [gofmt](https://golang.org/cmd/gofmt/)).
* Code must be documented adhering to the official Go
[commentary](https://golang.org/doc/effective_go.html#commentary) guidelines.
* Pull requests need to be based on and opened against the `master` branch.
* Pull reuqests should include a detailed description
* Commits are required to be signed. See [here](https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits)
for information on signing commits.
* Commit messages should be prefixed with the package(s) they modify.
* E.g. "eth, rpc: make trace configs optional"
- Code must adhere to the official Go
[formatting](https://golang.org/doc/effective_go.html#formatting) guidelines
(i.e. uses [gofmt](https://golang.org/cmd/gofmt/)).
- Code must be documented adhering to the official Go
[commentary](https://golang.org/doc/effective_go.html#commentary) guidelines.
- Pull requests need to be based on and opened against the `master` branch.
- Pull reuqests should include a detailed description
- Commits are required to be signed. See [here](https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits)
for information on signing commits.
- Commit messages should be prefixed with the package(s) they modify.
- E.g. "eth, rpc: make trace configs optional"

## Can I have feature X

Before you submit a feature request, please check and make sure that it isn't
Before you submit a feature request, please check and make sure that it isn't
possible through some other means.

106 changes: 93 additions & 13 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,13 @@ var (
acceptedBlockGasUsedCounter = metrics.NewRegisteredCounter("chain/block/gas/used/accepted", nil)
badBlockCounter = metrics.NewRegisteredCounter("chain/block/bad/count", nil)

txUnindexTimer = metrics.NewRegisteredCounter("chain/txs/unindex", nil)
acceptedTxsCounter = metrics.NewRegisteredCounter("chain/txs/accepted", nil)
processedTxsCounter = metrics.NewRegisteredCounter("chain/txs/processed", nil)

acceptedLogsCounter = metrics.NewRegisteredCounter("chain/logs/accepted", nil)
processedLogsCounter = metrics.NewRegisteredCounter("chain/logs/processed", nil)

ErrRefuseToCorruptArchiver = errors.New("node has operated with pruning disabled, shutting down to prevent missing tries")

errFutureBlockUnsupported = errors.New("future block insertion not supported")
Expand All @@ -102,7 +106,6 @@ const (
feeConfigCacheLimit = 256
coinbaseConfigCacheLimit = 256
badBlockLimit = 10
TriesInMemory = 128

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
Expand Down Expand Up @@ -173,6 +176,7 @@ type CacheConfig struct {
SkipSnapshotRebuild bool // Whether to skip rebuilding the snapshot in favor of returning an error (only set to true for tests)
Preimages bool // Whether to store preimage of trie key to the disk
AcceptedCacheSize int // Depth of accepted headers cache and accepted logs cache at the accepted tip
TxLookupLimit uint64 // Number of recent blocks for which to maintain transaction lookup indices
}

var DefaultCacheConfig = &CacheConfig{
Expand Down Expand Up @@ -269,9 +273,8 @@ type BlockChain struct {
// during shutdown and in tests.
acceptorWg sync.WaitGroup

// [rejournalWg] is used to wait for the trie clean rejournaling to complete.
// This is used during shutdown.
rejournalWg sync.WaitGroup
// [wg] is used to wait for the async blockchain processes to finish on shutdown.
wg sync.WaitGroup

// quit channel is used to listen for when the blockchain is shut down to close
// async processes.
Expand Down Expand Up @@ -354,6 +357,13 @@ func NewBlockChain(
// Create the state manager
bc.stateManager = NewTrieWriter(bc.stateCache.TrieDB(), cacheConfig)

// loadLastState writes indices, so we should start the tx indexer after that.
// Start tx indexer/unindexer here.
if bc.cacheConfig.TxLookupLimit != 0 {
bc.wg.Add(1)
go bc.dispatchTxUnindexer()
}

// Re-generate current block state if it is missing
if err := bc.loadLastState(lastAcceptedHash); err != nil {
return nil, err
Expand Down Expand Up @@ -401,16 +411,82 @@ func NewBlockChain(
log.Info("Starting to save trie clean cache periodically", "journalDir", bc.cacheConfig.TrieCleanJournal, "freq", bc.cacheConfig.TrieCleanRejournal)

triedb := bc.stateCache.TrieDB()
bc.rejournalWg.Add(1)
bc.wg.Add(1)
go func() {
defer bc.rejournalWg.Done()
defer bc.wg.Done()
triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit)
}()
}

return bc, nil
}

// dispatchTxUnindexer is responsible for the deletion of the
// transaction index.
// Invariant: If TxLookupLimit is 0, it means all tx indices will be preserved.
// Meaning that this function should never be called.
func (bc *BlockChain) dispatchTxUnindexer() {
defer bc.wg.Done()
txLookupLimit := bc.cacheConfig.TxLookupLimit

// If the user just upgraded to a new version which supports transaction
// index pruning, write the new tail and remove anything older.
if rawdb.ReadTxIndexTail(bc.db) == nil {
rawdb.WriteTxIndexTail(bc.db, 0)
}

// unindexes transactions depending on user configuration
unindexBlocks := func(tail uint64, head uint64, done chan struct{}) {
start := time.Now()
defer func() {
txUnindexTimer.Inc(time.Since(start).Milliseconds())
done <- struct{}{}
}()

// Update the transaction index to the new chain state
if head-txLookupLimit+1 >= tail {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(bc.db, tail, head-txLookupLimit+1, bc.quit)
}
}
// Any reindexing done, start listening to chain events and moving the index window
var (
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed
)
sub := bc.SubscribeChainAcceptedEvent(headCh)
if sub == nil {
log.Warn("could not create chain accepted subscription to unindex txs")
return
}
defer sub.Unsubscribe()

for {
select {
case head := <-headCh:
headNum := head.Block.NumberU64()
if headNum < txLookupLimit {
break
}

if done == nil {
done = make(chan struct{})
// Note: tail will not be nil since it is initialized in this function.
tail := rawdb.ReadTxIndexTail(bc.db)
go unindexBlocks(*tail, headNum, done)
}
case <-done:
done = nil
case <-bc.quit:
if done != nil {
log.Info("Waiting background transaction indexer to exit")
<-done
}
return
}
}
}

// writeBlockAcceptedIndices writes any indices that must be persisted for accepted block.
// This includes the following:
// - transaction lookup indices
Expand Down Expand Up @@ -532,6 +608,9 @@ func (bc *BlockChain) startAcceptor() {

acceptorWorkTimer.Inc(time.Since(start).Milliseconds())
acceptorWorkCount.Inc(1)
// Note: in contrast to most accepted metrics, we increment the accepted log metrics in the acceptor queue because
// the logs are already processed in the acceptor queue.
acceptedLogsCounter.Inc(int64(len(logs)))
}
}

Expand All @@ -555,8 +634,8 @@ func (bc *BlockChain) addAcceptorQueue(b *types.Block) {
// DrainAcceptorQueue blocks until all items in [acceptorQueue] have been
// processed.
func (bc *BlockChain) DrainAcceptorQueue() {
bc.acceptorClosingLock.Lock()
defer bc.acceptorClosingLock.Unlock()
bc.acceptorClosingLock.RLock()
defer bc.acceptorClosingLock.RUnlock()

if bc.acceptorClosed {
return
Expand Down Expand Up @@ -782,7 +861,8 @@ func (bc *BlockChain) ValidateCanonicalChain() error {
// Transactions are only indexed beneath the last accepted block, so we only check
// that the transactions have been indexed, if we are checking below the last accepted
// block.
if current.NumberU64() <= bc.lastAccepted.NumberU64() {
shouldIndexTxs := bc.cacheConfig.TxLookupLimit == 0 || bc.lastAccepted.NumberU64() < current.NumberU64()+bc.cacheConfig.TxLookupLimit
if current.NumberU64() <= bc.lastAccepted.NumberU64() && shouldIndexTxs {
// Ensure that all of the transactions have been stored correctly in the canonical
// chain
for txIndex, tx := range txs {
Expand Down Expand Up @@ -840,7 +920,6 @@ func (bc *BlockChain) Stop() {
return
}

// Wait for accepted feed to process all remaining items
log.Info("Closing quit channel")
close(bc.quit)
// Wait for accepted feed to process all remaining items
Expand Down Expand Up @@ -868,9 +947,9 @@ func (bc *BlockChain) Stop() {
log.Info("Closing scope")
bc.scope.Close()

// Waiting for clean trie re-journal to complete
log.Info("Waiting for trie re-journal to complete")
bc.rejournalWg.Wait()
// Waiting for background processes to complete
log.Info("Waiting for background processes to complete")
bc.wg.Wait()

log.Info("Blockchain stopped")
}
Expand Down Expand Up @@ -1313,6 +1392,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {

processedBlockGasUsedCounter.Inc(int64(block.GasUsed()))
processedTxsCounter.Inc(int64(block.Transactions().Len()))
processedLogsCounter.Inc(int64(len(logs)))
blockInsertCount.Inc(1)
return nil
}
Expand Down
139 changes: 139 additions & 0 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,3 +745,142 @@ func TestCanonicalHashMarker(t *testing.T) {
}
}
}

func TestTransactionIndices(t *testing.T) {
// Configure and generate a sample block chain
require := require.New(t)
var (
gendb = rawdb.NewMemoryDatabase()
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
funds = big.NewInt(10000000000000)
gspec = &Genesis{
Config: &params.ChainConfig{HomesteadBlock: new(big.Int)},
Alloc: GenesisAlloc{addr1: {Balance: funds}},
}
genesis = gspec.MustCommit(gendb)
signer = types.LatestSigner(gspec.Config)
)
height := uint64(128)
blocks, _, err := GenerateChain(gspec.Config, genesis, dummy.NewFaker(), gendb, int(height), 10, func(i int, block *BlockGen) {
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(addr1), addr2, big.NewInt(10000), params.TxGas, nil, nil), signer, key1)
require.NoError(err)
block.AddTx(tx)
})
require.NoError(err)

blocks2, _, err := GenerateChain(gspec.Config, blocks[len(blocks)-1], dummy.NewFaker(), gendb, 10, 10, nil)
require.NoError(err)

check := func(tail *uint64, chain *BlockChain) {
stored := rawdb.ReadTxIndexTail(chain.db)
require.EqualValues(tail, stored)

if tail == nil {
return
}
for i := *tail; i <= chain.CurrentBlock().NumberU64(); i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.NotNilf(index, "Miss transaction indices, number %d hash %s", i, tx.Hash().Hex())
}
}

for i := uint64(0); i < *tail; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.Nilf(index, "Transaction indices should be deleted, number %d hash %s", i, tx.Hash().Hex())
}
}
}

conf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SkipSnapshotRebuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64,
}

// Init block chain and check all needed indices has been indexed.
chainDB := rawdb.NewMemoryDatabase()
gspec.MustCommit(chainDB)

chain, err := createBlockChain(chainDB, conf, gspec.Config, common.Hash{})
require.NoError(err)

_, err = chain.InsertChain(blocks)
require.NoError(err)

for _, block := range blocks {
err := chain.Accept(block)
require.NoError(err)
}
chain.DrainAcceptorQueue()

chain.Stop()
check(nil, chain) // check all indices has been indexed

lastAcceptedHash := chain.CurrentHeader().Hash()

// Reconstruct a block chain which only reserves limited tx indices
// 128 blocks were previously indexed. Now we add a new block at each test step.
limit := []uint64{130 /* 129 + 1 reserve all */, 64 /* drop stale */, 32 /* shorten history */}
tails := []uint64{0 /* reserve all */, 67 /* 130 - 64 + 1 */, 100 /* 131 - 32 + 1 */}
for i, l := range limit {
conf.TxLookupLimit = l

chain, err := createBlockChain(chainDB, conf, gspec.Config, lastAcceptedHash)
require.NoError(err)

newBlks := blocks2[i : i+1]
_, err = chain.InsertChain(newBlks) // Feed chain a higher block to trigger indices updater.
require.NoError(err)

err = chain.Accept(newBlks[0]) // Accept the block to trigger indices updater.
require.NoError(err)

chain.DrainAcceptorQueue()
time.Sleep(50 * time.Millisecond) // Wait for indices initialisation

chain.Stop()
check(&tails[i], chain)

lastAcceptedHash = chain.CurrentHeader().Hash()
}
}

func TestTxLookupBlockChain(t *testing.T) {
cacheConf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SkipSnapshotRebuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64, // ensure channel doesn't block
TxLookupLimit: 5,
}
createTxLookupBlockChain := func(db ethdb.Database, chainConfig *params.ChainConfig, lastAcceptedHash common.Hash) (*BlockChain, error) {
return createBlockChain(db, cacheConf, chainConfig, lastAcceptedHash)
}
for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
tt.testFunc(t, createTxLookupBlockChain)
})
}
}
Loading

0 comments on commit dbc1b4d

Please sign in to comment.