Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip [vs master]: separate atomic tx handling #587

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// (c) 2020-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm
package atx

import (
"encoding/binary"
Expand All @@ -22,6 +22,12 @@ import (

var _ AtomicBackend = &atomicBackend{}

var (
// Prefixes for atomic trie
atomicTrieDBPrefix = []byte("atomicTrieDB")
atomicTrieMetaDBPrefix = []byte("atomicTrieMetaDB")
)

// AtomicBackend abstracts the verification and processing
// of atomic transactions
type AtomicBackend interface {
Expand Down
44 changes: 16 additions & 28 deletions plugin/evm/atomic_state.go → plugin/atx/atomic_state.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
// (c) 2020-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm
package atx

import (
"fmt"

"github.com/ava-labs/avalanchego/chains/atomic"
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/ids"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -23,9 +20,9 @@ var _ AtomicState = &atomicState{}
type AtomicState interface {
// Root of the atomic trie after applying the state change.
Root() common.Hash
// Accept applies the state change to VM's persistent storage
// Changes are persisted atomically along with the provided [commitBatch].
Accept(commitBatch database.Batch, requests map[ids.ID]*atomic.Requests) error
// Accept applies the state change to the VM's commit batch, and outputs
// atomic ops to the shared memory writer.
Accept(writer SharedMemoryWriter) error
// Reject frees memory associated with the state change.
Reject() error
}
Expand All @@ -46,11 +43,18 @@ func (a *atomicState) Root() common.Hash {
}

// Accept applies the state change to VM's persistent storage.
func (a *atomicState) Accept(commitBatch database.Batch, requests map[ids.ID]*atomic.Requests) error {
// Add the new requests to the batch to be accepted
for chainID, requests := range requests {
mergeAtomicOpsToMap(a.atomicOps, chainID, requests)
func (a *atomicState) Accept(writer SharedMemoryWriter) error {
// If this is a bonus block, write [commitBatch] without applying atomic ops
// to shared memory.
if a.backend.IsBonus(a.blockHeight, a.blockHash) {
log.Info("skipping atomic tx acceptance on bonus block", "block", a.blockHash)
} else {
// Add the new requests to the batch to be accepted
for chainID, reqs := range a.atomicOps {
writer.AddSharedMemoryRequests(chainID, reqs)
}
}

// Update the atomic tx repository. Note it is necessary to invoke
// the correct method taking bonus blocks into consideration.
if a.backend.IsBonus(a.blockHeight, a.blockHash) {
Expand All @@ -72,23 +76,7 @@ func (a *atomicState) Accept(commitBatch database.Batch, requests map[ids.ID]*at
a.backend.lastAcceptedHash = a.blockHash
delete(a.backend.verifiedRoots, a.blockHash)

// get changes from the atomic trie and repository in a batch
// to be committed atomically with [commitBatch] and shared memory.
atomicChangesBatch, err := a.backend.db.CommitBatch()
if err != nil {
return fmt.Errorf("could not create commit batch in atomicState accept: %w", err)
}

// If this is a bonus block, write [commitBatch] without applying atomic ops
// to shared memory.
if a.backend.IsBonus(a.blockHeight, a.blockHash) {
log.Info("skipping atomic tx acceptance on bonus block", "block", a.blockHash)
return atomic.WriteAll(commitBatch, atomicChangesBatch)
}

// Otherwise, atomically commit pending changes in the version db with
// atomic ops to shared memory.
return a.backend.sharedMemory.Apply(a.atomicOps, commitBatch, atomicChangesBatch)
return nil
}

// Reject frees memory associated with the state change.
Expand Down
11 changes: 10 additions & 1 deletion plugin/evm/atomic_syncer.go → plugin/atx/atomic_syncer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// (c) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm
package atx

import (
"bytes"
Expand All @@ -24,6 +24,15 @@ var (
_ syncclient.LeafSyncTask = &atomicSyncerLeafTask{}
)

// Syncer represents a step in state sync,
// along with Start/Done methods to control
// and monitor progress.
// Error returns an error if any was encountered.
type Syncer interface {
Start(ctx context.Context) error
Done() <-chan error
}

// atomicSyncer is used to sync the atomic trie from the network. The CallbackLeafSyncer
// is responsible for orchestrating the sync while atomicSyncer is responsible for maintaining
// the state of progress and writing the actual atomic trie to the trieDB.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm
package atx

import (
"bytes"
Expand All @@ -27,6 +27,7 @@ import (
)

const commitInterval = 1024
const defaultStateSyncRequestSize = 1024 // the number of key/values to ask peers for per request

type atomicSyncTestCheckpoint struct {
expectedNumLeavesSynced int64 // expected number of leaves to have synced at this checkpoint
Expand Down
5 changes: 3 additions & 2 deletions plugin/evm/atomic_trie.go → plugin/atx/atomic_trie.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// (c) 2020-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm
package atx

import (
"fmt"
Expand All @@ -17,6 +17,7 @@ import (
"github.com/ava-labs/coreth/core"
"github.com/ava-labs/coreth/core/rawdb"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/plugin/db"
"github.com/ava-labs/coreth/trie"
"github.com/ava-labs/coreth/trie/triedb/hashdb"
"github.com/ava-labs/coreth/trie/trienode"
Expand Down Expand Up @@ -151,7 +152,7 @@ func newAtomicTrie(
}

trieDB := trie.NewDatabase(
rawdb.NewDatabase(Database{atomicTrieDB}),
rawdb.NewDatabase(db.Database{Database: atomicTrieDB}),
&trie.Config{
HashDB: &hashdb.Config{
CleanCacheSize: 64 * units.MiB, // Allocate 64MB of memory for clean cache
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm
package atx

import (
"encoding/binary"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// (c) 2020-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm
package atx

import (
"testing"
Expand All @@ -16,6 +16,10 @@ import (
"github.com/stretchr/testify/require"
)

var (
testCChainID = ids.ID{'c', 'c', 'h', 'a', 'i', 'n', 't', 'e', 's', 't'}
)

func testSharedMemory() atomic.SharedMemory {
m := atomic.NewMemory(memdb.New())
return m.NewSharedMemory(testCChainID)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// (c) 2020-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm
package atx

import (
"encoding/binary"
Expand All @@ -25,15 +25,14 @@ import (

const testCommitInterval = 100

func (tx *Tx) mustAtomicOps() map[ids.ID]*atomic.Requests {
func mustAtomicOps(tx *Tx) map[ids.ID]*atomic.Requests {
id, reqs, err := tx.AtomicOps()
if err != nil {
panic(err)
}
return map[ids.ID]*atomic.Requests{id: reqs}
}

// indexAtomicTxs updates [tr] with entries in [atomicOps] at height by creating
// a new snapshot, calculating a new root, and calling InsertTrie followed
// by AcceptTrie on the new root.
func indexAtomicTxs(tr AtomicTrie, height uint64, atomicOps map[ids.ID]*atomic.Requests) error {
Expand Down Expand Up @@ -281,7 +280,7 @@ func TestIndexerWriteAndRead(t *testing.T) {

// process 305 blocks so that we get three commits (100, 200, 300)
for height := uint64(1); height <= testCommitInterval*3+5; /*=305*/ height++ {
atomicRequests := testDataImportTx().mustAtomicOps()
atomicRequests := mustAtomicOps(testDataImportTx())
err := indexAtomicTxs(atomicTrie, height, atomicRequests)
assert.NoError(t, err)
if height%testCommitInterval == 0 {
Expand Down Expand Up @@ -370,7 +369,7 @@ func TestIndexingNilShouldNotImpactTrie(t *testing.T) {
// operations to index
ops := make([]map[ids.ID]*atomic.Requests, 0)
for i := 0; i <= testCommitInterval; i++ {
ops = append(ops, testDataImportTx().mustAtomicOps())
ops = append(ops, mustAtomicOps(testDataImportTx()))
}

// without nils
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// (c) 2020-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm
package atx

import (
"encoding/binary"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// (c) 2020-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm
package atx

import (
"encoding/binary"
Expand Down
102 changes: 102 additions & 0 deletions plugin/atx/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package atx

import (
"context"

"github.com/ava-labs/avalanchego/chains/atomic"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
snowmanblock "github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

type SharedMemoryWriter interface {
AddSharedMemoryRequests(chainID ids.ID, requests *atomic.Requests)
}

type BlockWithVerifyContext interface {
snowman.Block
snowmanblock.WithVerifyContext
}

type BlockImpl struct {
BlockWithVerifyContext

Txs []*Tx
SharedMemoryWriter SharedMemoryWriter
VM *VM
}

func (b *BlockImpl) Accept(ctx context.Context) error {
vm := b.VM
for _, tx := range b.Txs {
// Remove the accepted transaction from the mempool
vm.Mempool().RemoveTx(tx)
}

// Update VM state for atomic txs in this block. This includes updating the
// atomic tx repo, atomic trie, and shared memory.
atomicState, err := vm.atomicBackend.GetVerifiedAtomicState(common.Hash(b.ID()))
if err != nil {
// should never occur since [b] must be verified before calling Accept
return err
}
if err := atomicState.Accept(b.SharedMemoryWriter); err != nil {
return err
}
if err := b.BlockWithVerifyContext.Accept(ctx); err != nil {
// reinject txs to backend if accept fails
blk := b.BlockWithVerifyContext
if _, err := b.VM.atomicBackend.InsertTxs(common.Hash(blk.ID()), blk.Height(), common.Hash(blk.Parent()), b.Txs); err != nil {
log.Error("Failed to re-inject transactions in accepted block", "blockID", blk.ID(), "err", err)
}
return err
}

return nil
}

func (b *BlockImpl) VerifyWithContext(ctx context.Context, proposerVMBlockCtx *snowmanblock.Context) error {
// verify UTXOs named in import txs are present in shared memory.
if err := b.VM.VerifyUTXOsPresent(common.Hash(b.ID()), b.Height(), b.Txs); err != nil {
return err
}
blk := b.BlockWithVerifyContext
if err := blk.VerifyWithContext(ctx, proposerVMBlockCtx); err != nil {
return err
}
_, err := b.VM.atomicBackend.InsertTxs(common.Hash(blk.ID()), blk.Height(), common.Hash(blk.Parent()), b.Txs)
return err
}

func (b *BlockImpl) Verify(ctx context.Context) error {
// verify UTXOs named in import txs are present in shared memory.
if err := b.VM.VerifyUTXOsPresent(common.Hash(b.ID()), b.Height(), b.Txs); err != nil {
return err
}
blk := b.BlockWithVerifyContext
if err := blk.Verify(ctx); err != nil {
return err
}
_, err := b.VM.atomicBackend.InsertTxs(common.Hash(blk.ID()), blk.Height(), common.Hash(blk.Parent()), b.Txs)
return err
}

func (b *BlockImpl) Reject(ctx context.Context) error {
for _, tx := range b.Txs {
b.VM.Mempool().RemoveTx(tx)
if err := b.VM.Mempool().AddTx(tx); err != nil {
log.Debug("Failed to re-issue transaction in rejected block", "txID", tx.ID(), "err", err)
}
}
atomicState, err := b.VM.atomicBackend.GetVerifiedAtomicState(common.Hash(b.ID()))
if err != nil {
// should never occur since [b] must be verified before calling Reject
return err
}
if err := atomicState.Reject(); err != nil {
return err
}
return b.BlockWithVerifyContext.Reject(ctx)
}
Loading
Loading