Skip to content

Commit

Permalink
trie: remove node ordering slice in sync batch (ethereum#19929)
Browse files Browse the repository at this point in the history
When we flush a batch of trie nodes into database during the state
sync, we should guarantee that all children should be flushed before
parent.

Actually the trie nodes commit order is strict by: children -> parent.
But when we flush all ready nodes into db, we don't need the order
anymore since

    (1) they are all ready nodes (no more dependency)
    (2) underlying database provides write atomicity
  • Loading branch information
rjl493456442 authored and enriquefynn committed Feb 15, 2021
1 parent f279af6 commit 4ea617f
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 46 deletions.
42 changes: 26 additions & 16 deletions core/state/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,15 @@ func TestEmptyStateSync(t *testing.T) {
func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1) }
func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100) }

func testIterativeStateSync(t *testing.T, batch int) {
func testIterativeStateSync(t *testing.T, count int) {
// Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState()

// Create a destination state and sync with the scheduler
dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb))

queue := append([]common.Hash{}, sched.Missing(batch)...)
queue := append([]common.Hash{}, sched.Missing(count)...)
for len(queue) > 0 {
results := make([]trie.SyncResult, len(queue))
for i, hash := range queue {
Expand All @@ -157,10 +157,12 @@ func testIterativeStateSync(t *testing.T, batch int) {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
batch := dstDb.NewBatch()
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
queue = append(queue[:0], sched.Missing(batch)...)
batch.Write()
queue = append(queue[:0], sched.Missing(count)...)
}
// Cross check that the two states are in sync
checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
Expand Down Expand Up @@ -190,9 +192,11 @@ func TestIterativeDelayedStateSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
batch := dstDb.NewBatch()
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
batch.Write()
queue = append(queue[len(results):], sched.Missing(0)...)
}
// Cross check that the two states are in sync
Expand All @@ -205,7 +209,7 @@ func TestIterativeDelayedStateSync(t *testing.T) {
func TestIterativeRandomStateSyncIndividual(t *testing.T) { testIterativeRandomStateSync(t, 1) }
func TestIterativeRandomStateSyncBatched(t *testing.T) { testIterativeRandomStateSync(t, 100) }

func testIterativeRandomStateSync(t *testing.T, batch int) {
func testIterativeRandomStateSync(t *testing.T, count int) {
// Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState()

Expand All @@ -214,7 +218,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) {
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb))

queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
for _, hash := range sched.Missing(count) {
queue[hash] = struct{}{}
}
for len(queue) > 0 {
Expand All @@ -231,11 +235,13 @@ func testIterativeRandomStateSync(t *testing.T, batch int) {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
batch := dstDb.NewBatch()
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
batch.Write()
queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
for _, hash := range sched.Missing(count) {
queue[hash] = struct{}{}
}
}
Expand Down Expand Up @@ -277,9 +283,11 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
batch := dstDb.NewBatch()
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
batch.Write()
for _, hash := range sched.Missing(0) {
queue[hash] = struct{}{}
}
Expand Down Expand Up @@ -316,9 +324,11 @@ func TestIncompleteStateSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
batch := dstDb.NewBatch()
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
batch.Write()
for _, result := range results {
added = append(added, result.Hash)
}
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (s *stateSync) commit(force bool) error {
}
start := time.Now()
b := s.d.stateDB.NewBatch()
if written, err := s.sched.Commit(b); written == 0 || err != nil {
if err := s.sched.Commit(b); err != nil {
return err
}
if err := b.Write(); err != nil {
Expand Down
17 changes: 6 additions & 11 deletions trie/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,12 @@ type SyncResult struct {
// persisted data items.
type syncMemBatch struct {
batch map[common.Hash][]byte // In-memory membatch of recently completed items
order []common.Hash // Order of completion to prevent out-of-order data loss
}

// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
func newSyncMemBatch() *syncMemBatch {
return &syncMemBatch{
batch: make(map[common.Hash][]byte),
order: make([]common.Hash, 0, 256),
}
}

Expand Down Expand Up @@ -223,20 +221,18 @@ func (s *Sync) Process(results []SyncResult) (bool, int, error) {
}

// Commit flushes the data stored in the internal membatch out to persistent
// storage, returning the number of items written and any occurred error.
func (s *Sync) Commit(dbw ethdb.KeyValueWriter) (int, error) {
// storage, returning any occurred error.
func (s *Sync) Commit(dbw ethdb.Batch) error {
// Dump the membatch into a database dbw
for i, key := range s.membatch.order {
if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil {
return i, err
for key, value := range s.membatch.batch {
if err := dbw.Put(key[:], value); err != nil {
return err
}
s.bloom.Add(key[:])
}
written := len(s.membatch.order) // TODO(karalabe): could an order change improve write performance?

// Drop the membatch data and return
s.membatch = newSyncMemBatch()
return written, nil
return nil
}

// Pending returns the number of state entries currently pending for download.
Expand Down Expand Up @@ -330,7 +326,6 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
func (s *Sync) commit(req *request) (err error) {
// Write the node content to the membatch
s.membatch.batch[req.hash] = req.data
s.membatch.order = append(s.membatch.order, req.hash)

delete(s.requests, req.hash)

Expand Down
48 changes: 30 additions & 18 deletions trie/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestEmptySync(t *testing.T) {
func TestIterativeSyncIndividual(t *testing.T) { testIterativeSync(t, 1) }
func TestIterativeSyncBatched(t *testing.T) { testIterativeSync(t, 100) }

func testIterativeSync(t *testing.T, batch int) {
func testIterativeSync(t *testing.T, count int) {
// Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie()

Expand All @@ -114,7 +114,7 @@ func testIterativeSync(t *testing.T, batch int) {
triedb := NewDatabase(diskdb)
sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))

queue := append([]common.Hash{}, sched.Missing(batch)...)
queue := append([]common.Hash{}, sched.Missing(count)...)
for len(queue) > 0 {
results := make([]SyncResult, len(queue))
for i, hash := range queue {
Expand All @@ -127,10 +127,12 @@ func testIterativeSync(t *testing.T, batch int) {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(diskdb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
batch := diskdb.NewBatch()
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
queue = append(queue[:0], sched.Missing(batch)...)
batch.Write()
queue = append(queue[:0], sched.Missing(count)...)
}
// Cross check that the two tries are in sync
checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
Expand Down Expand Up @@ -161,9 +163,11 @@ func TestIterativeDelayedSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(diskdb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
batch := diskdb.NewBatch()
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
batch.Write()
queue = append(queue[len(results):], sched.Missing(10000)...)
}
// Cross check that the two tries are in sync
Expand All @@ -176,7 +180,7 @@ func TestIterativeDelayedSync(t *testing.T) {
func TestIterativeRandomSyncIndividual(t *testing.T) { testIterativeRandomSync(t, 1) }
func TestIterativeRandomSyncBatched(t *testing.T) { testIterativeRandomSync(t, 100) }

func testIterativeRandomSync(t *testing.T, batch int) {
func testIterativeRandomSync(t *testing.T, count int) {
// Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie()

Expand All @@ -186,7 +190,7 @@ func testIterativeRandomSync(t *testing.T, batch int) {
sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))

queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
for _, hash := range sched.Missing(count) {
queue[hash] = struct{}{}
}
for len(queue) > 0 {
Expand All @@ -203,11 +207,13 @@ func testIterativeRandomSync(t *testing.T, batch int) {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(diskdb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
batch := diskdb.NewBatch()
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
batch.Write()
queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
for _, hash := range sched.Missing(count) {
queue[hash] = struct{}{}
}
}
Expand Down Expand Up @@ -248,9 +254,11 @@ func TestIterativeRandomDelayedSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(diskdb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
batch := diskdb.NewBatch()
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
batch.Write()
for _, result := range results {
delete(queue, result.Hash)
}
Expand Down Expand Up @@ -293,9 +301,11 @@ func TestDuplicateAvoidanceSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(diskdb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
batch := diskdb.NewBatch()
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
batch.Write()
queue = append(queue[:0], sched.Missing(0)...)
}
// Cross check that the two tries are in sync
Expand Down Expand Up @@ -329,9 +339,11 @@ func TestIncompleteSync(t *testing.T) {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(diskdb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
batch := diskdb.NewBatch()
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
batch.Write()
for _, result := range results {
added = append(added, result.Hash)
}
Expand Down

0 comments on commit 4ea617f

Please sign in to comment.