From 4ea617f2f555254317ff22540b51eed2784ab696 Mon Sep 17 00:00:00 2001 From: gary rong Date: Tue, 29 Oct 2019 02:50:11 +0900 Subject: [PATCH] trie: remove node ordering slice in sync batch (#19929) When we flush a batch of trie nodes into database during the state sync, we should guarantee that all children should be flushed before parent. Actually the trie nodes commit order is strict by: children -> parent. But when we flush all ready nodes into db, we don't need the order anymore since (1) they are all ready nodes (no more dependency) (2) underlying database provides write atomicity --- core/state/sync_test.go | 42 +++++++++++++++++++------------- eth/downloader/statesync.go | 2 +- trie/sync.go | 17 +++++-------- trie/sync_test.go | 48 +++++++++++++++++++++++-------------- 4 files changed, 63 insertions(+), 46 deletions(-) diff --git a/core/state/sync_test.go b/core/state/sync_test.go index de098dce0fea..f4a221bd9df2 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -136,7 +136,7 @@ func TestEmptyStateSync(t *testing.T) { func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1) } func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100) } -func testIterativeStateSync(t *testing.T, batch int) { +func testIterativeStateSync(t *testing.T, count int) { // Create a random state to copy srcDb, srcRoot, srcAccounts := makeTestState() @@ -144,7 +144,7 @@ func testIterativeStateSync(t *testing.T, batch int) { dstDb := rawdb.NewMemoryDatabase() sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb)) - queue := append([]common.Hash{}, sched.Missing(batch)...) + queue := append([]common.Hash{}, sched.Missing(count)...) for len(queue) > 0 { results := make([]trie.SyncResult, len(queue)) for i, hash := range queue { @@ -157,10 +157,12 @@ func testIterativeStateSync(t *testing.T, batch int) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(dstDb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := dstDb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } - queue = append(queue[:0], sched.Missing(batch)...) + batch.Write() + queue = append(queue[:0], sched.Missing(count)...) } // Cross check that the two states are in sync checkStateAccounts(t, dstDb, srcRoot, srcAccounts) @@ -190,9 +192,11 @@ func TestIterativeDelayedStateSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(dstDb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := dstDb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() queue = append(queue[len(results):], sched.Missing(0)...) } // Cross check that the two states are in sync @@ -205,7 +209,7 @@ func TestIterativeDelayedStateSync(t *testing.T) { func TestIterativeRandomStateSyncIndividual(t *testing.T) { testIterativeRandomStateSync(t, 1) } func TestIterativeRandomStateSyncBatched(t *testing.T) { testIterativeRandomStateSync(t, 100) } -func testIterativeRandomStateSync(t *testing.T, batch int) { +func testIterativeRandomStateSync(t *testing.T, count int) { // Create a random state to copy srcDb, srcRoot, srcAccounts := makeTestState() @@ -214,7 +218,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb)) queue := make(map[common.Hash]struct{}) - for _, hash := range sched.Missing(batch) { + for _, hash := range sched.Missing(count) { queue[hash] = struct{}{} } for len(queue) > 0 { @@ -231,11 +235,13 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(dstDb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := dstDb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() queue = make(map[common.Hash]struct{}) - for _, hash := range sched.Missing(batch) { + for _, hash := range sched.Missing(count) { queue[hash] = struct{}{} } } @@ -277,9 +283,11 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(dstDb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := dstDb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() for _, hash := range sched.Missing(0) { queue[hash] = struct{}{} } @@ -316,9 +324,11 @@ func TestIncompleteStateSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(dstDb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := dstDb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() for _, result := range results { added = append(added, result.Hash) } diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index b422557d58c8..f875b3a84c28 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -347,7 +347,7 @@ func (s *stateSync) commit(force bool) error { } start := time.Now() b := s.d.stateDB.NewBatch() - if written, err := s.sched.Commit(b); written == 0 || err != nil { + if err := s.sched.Commit(b); err != nil { return err } if err := b.Write(); err != nil { diff --git a/trie/sync.go b/trie/sync.go index 6f40b45a1ebb..e5a0c174938b 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -57,14 +57,12 @@ type SyncResult struct { // persisted data items. type syncMemBatch struct { batch map[common.Hash][]byte // In-memory membatch of recently completed items - order []common.Hash // Order of completion to prevent out-of-order data loss } // newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. func newSyncMemBatch() *syncMemBatch { return &syncMemBatch{ batch: make(map[common.Hash][]byte), - order: make([]common.Hash, 0, 256), } } @@ -223,20 +221,18 @@ func (s *Sync) Process(results []SyncResult) (bool, int, error) { } // Commit flushes the data stored in the internal membatch out to persistent -// storage, returning the number of items written and any occurred error. -func (s *Sync) Commit(dbw ethdb.KeyValueWriter) (int, error) { +// storage, returning any occurred error. +func (s *Sync) Commit(dbw ethdb.Batch) error { // Dump the membatch into a database dbw - for i, key := range s.membatch.order { - if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil { - return i, err + for key, value := range s.membatch.batch { + if err := dbw.Put(key[:], value); err != nil { + return err } s.bloom.Add(key[:]) } - written := len(s.membatch.order) // TODO(karalabe): could an order change improve write performance? - // Drop the membatch data and return s.membatch = newSyncMemBatch() - return written, nil + return nil } // Pending returns the number of state entries currently pending for download. @@ -330,7 +326,6 @@ func (s *Sync) children(req *request, object node) ([]*request, error) { func (s *Sync) commit(req *request) (err error) { // Write the node content to the membatch s.membatch.batch[req.hash] = req.data - s.membatch.order = append(s.membatch.order, req.hash) delete(s.requests, req.hash) diff --git a/trie/sync_test.go b/trie/sync_test.go index 0621bb43570e..6025b87fccba 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -105,7 +105,7 @@ func TestEmptySync(t *testing.T) { func TestIterativeSyncIndividual(t *testing.T) { testIterativeSync(t, 1) } func TestIterativeSyncBatched(t *testing.T) { testIterativeSync(t, 100) } -func testIterativeSync(t *testing.T, batch int) { +func testIterativeSync(t *testing.T, count int) { // Create a random trie to copy srcDb, srcTrie, srcData := makeTestTrie() @@ -114,7 +114,7 @@ func testIterativeSync(t *testing.T, batch int) { triedb := NewDatabase(diskdb) sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) - queue := append([]common.Hash{}, sched.Missing(batch)...) + queue := append([]common.Hash{}, sched.Missing(count)...) for len(queue) > 0 { results := make([]SyncResult, len(queue)) for i, hash := range queue { @@ -127,10 +127,12 @@ func testIterativeSync(t *testing.T, batch int) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } - queue = append(queue[:0], sched.Missing(batch)...) + batch.Write() + queue = append(queue[:0], sched.Missing(count)...) } // Cross check that the two tries are in sync checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData) @@ -161,9 +163,11 @@ func TestIterativeDelayedSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() queue = append(queue[len(results):], sched.Missing(10000)...) } // Cross check that the two tries are in sync @@ -176,7 +180,7 @@ func TestIterativeDelayedSync(t *testing.T) { func TestIterativeRandomSyncIndividual(t *testing.T) { testIterativeRandomSync(t, 1) } func TestIterativeRandomSyncBatched(t *testing.T) { testIterativeRandomSync(t, 100) } -func testIterativeRandomSync(t *testing.T, batch int) { +func testIterativeRandomSync(t *testing.T, count int) { // Create a random trie to copy srcDb, srcTrie, srcData := makeTestTrie() @@ -186,7 +190,7 @@ func testIterativeRandomSync(t *testing.T, batch int) { sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) queue := make(map[common.Hash]struct{}) - for _, hash := range sched.Missing(batch) { + for _, hash := range sched.Missing(count) { queue[hash] = struct{}{} } for len(queue) > 0 { @@ -203,11 +207,13 @@ func testIterativeRandomSync(t *testing.T, batch int) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() queue = make(map[common.Hash]struct{}) - for _, hash := range sched.Missing(batch) { + for _, hash := range sched.Missing(count) { queue[hash] = struct{}{} } } @@ -248,9 +254,11 @@ func TestIterativeRandomDelayedSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() for _, result := range results { delete(queue, result.Hash) } @@ -293,9 +301,11 @@ func TestDuplicateAvoidanceSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() queue = append(queue[:0], sched.Missing(0)...) } // Cross check that the two tries are in sync @@ -329,9 +339,11 @@ func TestIncompleteSync(t *testing.T) { if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } - if index, err := sched.Commit(diskdb); err != nil { - t.Fatalf("failed to commit data #%d: %v", index, err) + batch := diskdb.NewBatch() + if err := sched.Commit(batch); err != nil { + t.Fatalf("failed to commit data: %v", err) } + batch.Write() for _, result := range results { added = append(added, result.Hash) }