From b3de53798fb329c2647ff703b3a5ab985f8469fe Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Sun, 4 Mar 2018 21:11:03 +0800 Subject: [PATCH 1/2] eth/downloader: flush state data before exit --- eth/downloader/statesync.go | 20 +++++++++++++------- trie/sync.go | 2 +- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index ee6c7b491401..dd866e323cb7 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -274,16 +274,19 @@ func (s *stateSync) Cancel() error { // receive data from peers, rather those are buffered up in the downloader and // pushed here async. The reason is to decouple processing from data receipt // and timeouts. -func (s *stateSync) loop() error { +func (s *stateSync) loop() (err error) { // Listen for new peer events to assign tasks to them newPeer := make(chan *peerConnection, 1024) peerSub := s.d.peers.SubscribeNewPeers(newPeer) defer peerSub.Unsubscribe() + defer func() { + err = s.commit(true) + }() // Keep assigning new tasks until the sync completes or aborts for s.sched.Pending() > 0 { - if err := s.commit(false); err != nil { - return err + if err = s.commit(false); err != nil { + return } s.assignTasks() // Tasks assigned, wait for something to happen @@ -307,14 +310,14 @@ func (s *stateSync) loop() error { s.d.dropPeer(req.peer.id) } // Process all the received blobs and check for stale delivery - if err := s.process(req); err != nil { + if err = s.process(req); err != nil { log.Warn("Node data write error", "err", err) - return err + return } req.peer.SetNodeDataIdle(len(req.response)) } } - return s.commit(true) + return } func (s *stateSync) commit(force bool) error { @@ -323,7 +326,10 @@ func (s *stateSync) commit(force bool) error { } start := time.Now() b := s.d.stateDB.NewBatch() - s.sched.Commit(b) + // Ignore empty write. + if written, err := s.sched.Commit(b); written == 0 || err != nil { + return err + } if err := b.Write(); err != nil { return fmt.Errorf("DB write error: %v", err) } diff --git a/trie/sync.go b/trie/sync.go index b573a9f73245..013de7c38f11 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -212,7 +212,7 @@ func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { } // Commit flushes the data stored in the internal membatch out to persistent -// storage, returning th enumber of items written and any occurred error. +// storage, returning th number of items written and any occurred error. func (s *TrieSync) Commit(dbw ethdb.Putter) (int, error) { // Dump the membatch into a database dbw for i, key := range s.membatch.order { From 1a243afb4dcbfc2ea935e82d5d7141829ae6332a Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Fri, 16 Mar 2018 22:49:56 +0800 Subject: [PATCH 2/2] trie, eth: minor fix --- eth/downloader/statesync.go | 12 +++++++----- trie/sync.go | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index dd866e323cb7..d1d3a3d9746a 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -280,13 +280,16 @@ func (s *stateSync) loop() (err error) { peerSub := s.d.peers.SubscribeNewPeers(newPeer) defer peerSub.Unsubscribe() defer func() { - err = s.commit(true) + cerr := s.commit(true) + if err == nil { + err = cerr + } }() // Keep assigning new tasks until the sync completes or aborts for s.sched.Pending() > 0 { if err = s.commit(false); err != nil { - return + return err } s.assignTasks() // Tasks assigned, wait for something to happen @@ -312,12 +315,12 @@ func (s *stateSync) loop() (err error) { // Process all the received blobs and check for stale delivery if err = s.process(req); err != nil { log.Warn("Node data write error", "err", err) - return + return err } req.peer.SetNodeDataIdle(len(req.response)) } } - return + return nil } func (s *stateSync) commit(force bool) error { @@ -326,7 +329,6 @@ func (s *stateSync) commit(force bool) error { } start := time.Now() b := s.d.stateDB.NewBatch() - // Ignore empty write. if written, err := s.sched.Commit(b); written == 0 || err != nil { return err } diff --git a/trie/sync.go b/trie/sync.go index 013de7c38f11..4ae975d0428c 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -212,7 +212,7 @@ func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { } // Commit flushes the data stored in the internal membatch out to persistent -// storage, returning th number of items written and any occurred error. +// storage, returning the number of items written and any occurred error. func (s *TrieSync) Commit(dbw ethdb.Putter) (int, error) { // Dump the membatch into a database dbw for i, key := range s.membatch.order {