Skip to content

Commit

Permalink
Cleanup: newLogger & remove core.View() (ethereum#612)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mariano Cortesi authored and celo-ci-bot-user committed Nov 19, 2019
1 parent ecf58f1 commit 6de2cdd
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 109 deletions.
49 changes: 21 additions & 28 deletions consensus/istanbul/core/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@ func (c *core) checkMessage(msgCode uint64, view *istanbul.View) error {

// Round change messages should be in the same sequence but be >= the desired round
if msgCode == istanbul.MsgRoundChange {
if view.Sequence.Cmp(c.currentView().Sequence) > 0 {
if view.Sequence.Cmp(c.current.Sequence()) > 0 {
return errFutureMessage
} else if view.Round.Cmp(c.current.DesiredRound()) < 0 {
return errOldMessage
}
return nil
}

if view.Cmp(c.currentView()) > 0 {
if view.Cmp(c.current.View()) > 0 {
return errFutureMessage
}

// Discard messages from previous views, unless they are commits from the previous sequence,
// with the same round as what we wound up finalizing, as we would be able to include those
// to create the ParentAggregatedSeal for our next proposal.
if view.Cmp(c.currentView()) < 0 {
if view.Cmp(c.current.View()) < 0 {
if msgCode == istanbul.MsgCommit {

lastSubject, err := c.backend.LastSubject()
Expand All @@ -78,11 +78,8 @@ func (c *core) checkMessage(msgCode uint64, view *istanbul.View) error {

// StateAcceptRequest only accepts istanbul.MsgPreprepare
// other messages are future messages
if c.state == StateAcceptRequest {
if msgCode > istanbul.MsgPreprepare {
return errFutureMessage
}
return nil
if c.state == StateAcceptRequest && msgCode > istanbul.MsgPreprepare {
return errFutureMessage
}

// For states(StatePreprepared, StatePrepared, StateCommitted),
Expand All @@ -91,12 +88,7 @@ func (c *core) checkMessage(msgCode uint64, view *istanbul.View) error {
}

func (c *core) storeBacklog(msg *istanbul.Message, src istanbul.Validator) {
logger := c.logger.New("from", msg.Address, "state", c.state, "func", "storeBacklog")
if c.current != nil {
logger = logger.New("cur_seq", c.current.Sequence(), "cur_round", c.current.Round())
} else {
logger = logger.New("cur_seq", 0, "cur_round", -1)
}
logger := c.newLogger("func", "storeBacklog", "from", msg.Address)

if msg.Address == c.address {
logger.Warn("Backlog from self")
Expand Down Expand Up @@ -146,7 +138,7 @@ func (c *core) processBacklog() {
continue
}

logger := c.logger.New("from", src, "state", c.state, "cur_round", c.current.Round(), "cur_seq", c.current.Sequence(), "func", "processBacklog")
logger := c.newLogger("func", "processBacklog", "from", src)
isFuture := false

// We stop processing if
Expand Down Expand Up @@ -178,28 +170,29 @@ func (c *core) processBacklog() {
view = rc.View
}
}

if view == nil {
logger.Debug("Nil view", "msg", msg)
continue
}

// Push back if it's a future message
err := c.checkMessage(msg.Code, view)
if err != nil {
if err == errFutureMessage {
logger.Trace("Stop processing backlog", "msg", msg)
backlog.Push(msg, prio)
isFuture = true
break
}
if err == nil {
logger.Trace("Post backlog event", "msg", msg)

go c.sendEvent(backlogEvent{
src: src,
msg: msg,
})
} else if err == errFutureMessage {
logger.Trace("Stop processing backlog", "msg", msg)
backlog.Push(msg, prio)
isFuture = true
} else {
logger.Trace("Skip the backlog event", "msg", msg, "err", err)
continue
}
logger.Trace("Post backlog event", "msg", msg)

go c.sendEvent(backlogEvent{
src: src,
msg: msg,
})
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/istanbul/core/backlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestCheckMessage(t *testing.T) {
}
}

v = c.currentView()
v = c.current.View()
// current view, state = StateAcceptRequest
c.state = StateAcceptRequest
for i := 0; i < len(testCode); i++ {
Expand Down
4 changes: 2 additions & 2 deletions consensus/istanbul/core/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func (c *core) sendCommit() {
logger := c.logger.New("state", c.state, "cur_round", c.current.Round(), "cur_seq", c.current.Sequence(), "func", "sendCommit")
logger := c.newLogger("func", "sendCommit")
logger.Trace("Sending commit")
sub := c.current.Subject()
c.broadcastCommit(sub)
Expand Down Expand Up @@ -85,7 +85,7 @@ func (c *core) handleCommit(msg *istanbul.Message) error {

// Valid commit messages may be for the current, or previous sequence. We compare against our
// current view to find out which.
if commit.View.Cmp(c.currentView()) == 0 {
if commit.View.Cmp(c.current.View()) == 0 {
return c.handleCheckedCommitForCurrentSequence(msg, commit)
} else {
return c.handleCheckedCommitForPreviousSequence(msg, commit)
Expand Down
18 changes: 6 additions & 12 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type core struct {
}

// Appends the current view and state to the given context.
func (c *core) NewLogger(ctx ...interface{}) log.Logger {
func (c *core) newLogger(ctx ...interface{}) log.Logger {
var seq, round *big.Int
state := c.state
if c.current != nil {
Expand All @@ -108,7 +108,7 @@ func (c *core) NewLogger(ctx ...interface{}) log.Logger {
round = big.NewInt(-1)
}
tmp := c.logger.New(ctx...)
return tmp.New("cur_seq", seq, "cur_round", round, "state", state)
return tmp.New("cur_seq", seq, "cur_round", round, "state", state, "address", c.address)
}

func (c *core) SetAddress(address common.Address) {
Expand Down Expand Up @@ -156,13 +156,6 @@ func (c *core) broadcast(msg *istanbul.Message) {
}
}

func (c *core) currentView() *istanbul.View {
return &istanbul.View{
Sequence: new(big.Int).Set(c.current.Sequence()),
Round: new(big.Int).Set(c.current.Round()),
}
}

func (c *core) isProposer() bool {
if c.valSet == nil {
return false
Expand Down Expand Up @@ -359,14 +352,15 @@ func (c *core) startNewRound(round *big.Int) {

// All actions that occur when transitioning to waiting for round change state.
func (c *core) waitForDesiredRound(r *big.Int) {
logger := c.logger.New("func", "waitForDesiredRound", "cur_round", c.current.Round(), "old_desired_round", c.current.DesiredRound(), "new_desired_round", r)
logger := c.newLogger("func", "waitForDesiredRound", "old_desired_round", c.current.DesiredRound(), "new_desired_round", r)

// Don't wait for an older round
if c.current.DesiredRound().Cmp(r) >= 0 {
logger.Debug("New desired round not greater than current desired round")
return
}
logger.Debug("Waiting for desired round")

logger.Debug("Waiting for desired round")
desiredView := &istanbul.View{
Sequence: new(big.Int).Set(c.current.Sequence()),
Round: new(big.Int).Set(r),
Expand Down Expand Up @@ -431,7 +425,7 @@ func (c *core) stopTimer() {
}

func (c *core) newRoundChangeTimer() {
c.newRoundChangeTimerForView(c.currentView())
c.newRoundChangeTimerForView(c.current.View())
}

func (c *core) newRoundChangeTimerForView(view *istanbul.View) {
Expand Down
28 changes: 9 additions & 19 deletions consensus/istanbul/core/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *core) Stop() error {
}

func (c *core) CurrentView() *istanbul.View {
return c.currentView()
return c.current.View()
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -144,12 +144,7 @@ func (c *core) sendEvent(ev interface{}) {
}

func (c *core) handleMsg(payload []byte) error {
logger := c.logger.New("func", "handleMsg")
if c.current != nil {
logger = logger.New("cur_seq", c.current.Sequence(), "cur_round", c.current.Round())
} else {
logger = logger.New("cur_seq", 0, "cur_round", -1)
}
logger := c.newLogger("func", "handleMsg")

// Decode message and check its signature
msg := new(istanbul.Message)
Expand All @@ -169,15 +164,10 @@ func (c *core) handleMsg(payload []byte) error {
}

func (c *core) handleCheckedMsg(msg *istanbul.Message, src istanbul.Validator) error {
logger := c.logger.New("address", c.address, "from", msg.Address, "func", "handleCheckedMsg")
if c.current != nil {
logger = logger.New("cur_seq", c.current.Sequence(), "cur_round", c.current.Round())
} else {
logger = logger.New("cur_seq", 0, "cur_round", -1)
}
logger := c.newLogger("func", "handleCheckedMsg", "from", msg.Address)

// Store the message if it's a future message
testBacklog := func(err error) error {
catchFutureMessages := func(err error) error {
if err == errFutureMessage {
c.storeBacklog(msg, src)
}
Expand All @@ -187,13 +177,13 @@ func (c *core) handleCheckedMsg(msg *istanbul.Message, src istanbul.Validator) e

switch msg.Code {
case istanbul.MsgPreprepare:
return testBacklog(c.handlePreprepare(msg))
return catchFutureMessages(c.handlePreprepare(msg))
case istanbul.MsgPrepare:
return testBacklog(c.handlePrepare(msg))
return catchFutureMessages(c.handlePrepare(msg))
case istanbul.MsgCommit:
return testBacklog(c.handleCommit(msg))
return catchFutureMessages(c.handleCommit(msg))
case istanbul.MsgRoundChange:
return testBacklog(c.handleRoundChange(msg))
return catchFutureMessages(c.handleRoundChange(msg))
default:
logger.Error("Invalid message", "msg", msg)
}
Expand All @@ -202,7 +192,7 @@ func (c *core) handleCheckedMsg(msg *istanbul.Message, src istanbul.Validator) e
}

func (c *core) handleTimeoutMsg(timeoutView *istanbul.View) {
logger := c.NewLogger("func", "handleTimeoutMsg", "round", timeoutView.Round)
logger := c.newLogger("func", "handleTimeoutMsg", "round", timeoutView.Round)
logger.Trace("Timed out, trying to wait for next round")

nextRound := new(big.Int).Add(timeoutView.Round, common.Big1)
Expand Down
8 changes: 4 additions & 4 deletions consensus/istanbul/core/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

func (c *core) sendPrepare() {
logger := c.logger.New("state", c.state, "cur_round", c.current.Round(), "cur_seq", c.current.Sequence(), "func", "sendPrepare")
logger := c.newLogger("func", "sendPrepare")

sub := c.current.Subject()
encodedSubject, err := Encode(sub)
Expand All @@ -40,7 +40,7 @@ func (c *core) sendPrepare() {
}

func (c *core) verifyPreparedCertificate(preparedCertificate istanbul.PreparedCertificate) error {
logger := c.logger.New("state", c.state, "cur_round", c.current.Round(), "cur_seq", c.current.Sequence(), "func", "verifyPreparedCertificate")
logger := c.newLogger("func", "verifyPreparedCertificate")

// Validate the attached proposal
if _, err := c.backend.Verify(preparedCertificate.Proposal); err != nil {
Expand Down Expand Up @@ -88,7 +88,7 @@ func (c *core) verifyPreparedCertificate(preparedCertificate istanbul.PreparedCe
}

// Verify message for the proper sequence.
if subject.View.Sequence.Cmp(c.currentView().Sequence) != 0 {
if subject.View.Sequence.Cmp(c.current.Sequence()) != 0 {
return errInvalidPreparedCertificateMsgView
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func (c *core) verifyPreparedCertificate(preparedCertificate istanbul.PreparedCe
}

func (c *core) handlePrepare(msg *istanbul.Message) error {
logger := c.logger.New("state", c.state, "cur_round", c.current.Round(), "cur_seq", c.current.Sequence(), "func", "handlePrepare", "tag", "handleMsg")
logger := c.newLogger("func", "handlePrepare", "tag", "handleMsg")
// Decode PREPARE message
var prepare *istanbul.Subject
err := msg.Decode(&prepare)
Expand Down
6 changes: 3 additions & 3 deletions consensus/istanbul/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
)

func (c *core) sendPreprepare(request *istanbul.Request, roundChangeCertificate istanbul.RoundChangeCertificate) {
logger := c.logger.New("state", c.state, "cur_round", c.current.Round(), "cur_seq", c.current.Sequence(), "func", "sendPreprepare")
logger := c.newLogger("func", "sendPreprepare")

// If I'm the proposer and I have the same sequence with the proposal
if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.isProposer() {
curView := c.currentView()
curView := c.current.View()
preprepare, err := Encode(&istanbul.Preprepare{
View: curView,
Proposal: request.Proposal,
Expand All @@ -50,7 +50,7 @@ func (c *core) sendPreprepare(request *istanbul.Request, roundChangeCertificate
}

func (c *core) handlePreprepare(msg *istanbul.Message) error {
logger := c.logger.New("from", msg.Address, "state", c.state, "cur_round", c.current.Round(), "cur_seq", c.current.Sequence(), "func", "handlePreprepare", "tag", "handleMsg")
logger := c.newLogger("func", "handlePreprepare", "tag", "handleMsg", "from", msg.Address)
logger.Trace("Got pre-prepare message", "msg", msg)

// Decode PRE-PREPARE
Expand Down
18 changes: 9 additions & 9 deletions consensus/istanbul/core/preprepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestHandlePreprepare(t *testing.T) {
}(),
func(sys *testSystem) istanbul.RoundChangeCertificate {
// Duplicate messages
roundChangeCertificate := sys.getRoundChangeCertificate(t, *(sys.backends[0].engine.(*core).currentView()), istanbul.EmptyPreparedCertificate())
roundChangeCertificate := sys.getRoundChangeCertificate(t, *(sys.backends[0].engine.(*core).current.View()), istanbul.EmptyPreparedCertificate())
roundChangeCertificate.RoundChangeMessages[1] = roundChangeCertificate.RoundChangeMessages[0]
return roundChangeCertificate
},
Expand Down Expand Up @@ -201,14 +201,14 @@ func TestHandlePreprepare(t *testing.T) {
return sys
}(),
func(sys *testSystem) istanbul.RoundChangeCertificate {
view1 := *(sys.backends[0].engine.(*core).currentView())
view1 := *(sys.backends[0].engine.(*core).current.View())

var view2 istanbul.View
view2.Sequence = big.NewInt(view1.Sequence.Int64())
view2.Round = big.NewInt(view1.Round.Int64() + 1)

preparedCertificate := sys.getPreparedCertificate(t, []istanbul.View{view1, view2}, makeBlock(2))
roundChangeCertificate := sys.getRoundChangeCertificate(t, *(sys.backends[0].engine.(*core).currentView()), preparedCertificate)
roundChangeCertificate := sys.getRoundChangeCertificate(t, *(sys.backends[0].engine.(*core).current.View()), preparedCertificate)
return roundChangeCertificate
},
makeBlock(2),
Expand Down Expand Up @@ -237,8 +237,8 @@ func TestHandlePreprepare(t *testing.T) {
return sys
}(),
func(sys *testSystem) istanbul.RoundChangeCertificate {
preparedCertificate := sys.getPreparedCertificate(t, []istanbul.View{*(sys.backends[0].engine.(*core).currentView())}, makeBlock(2))
roundChangeCertificate := sys.getRoundChangeCertificate(t, *(sys.backends[0].engine.(*core).currentView()), preparedCertificate)
preparedCertificate := sys.getPreparedCertificate(t, []istanbul.View{*(sys.backends[0].engine.(*core).current.View())}, makeBlock(2))
roundChangeCertificate := sys.getRoundChangeCertificate(t, *(sys.backends[0].engine.(*core).current.View()), preparedCertificate)
return roundChangeCertificate
},
makeBlock(1),
Expand All @@ -263,8 +263,8 @@ func TestHandlePreprepare(t *testing.T) {
return sys
}(),
func(sys *testSystem) istanbul.RoundChangeCertificate {
preparedCertificate := sys.getPreparedCertificate(t, []istanbul.View{*(sys.backends[0].engine.(*core).currentView())}, makeBlock(0))
roundChangeCertificate := sys.getRoundChangeCertificate(t, *(sys.backends[0].engine.(*core).currentView()), preparedCertificate)
preparedCertificate := sys.getPreparedCertificate(t, []istanbul.View{*(sys.backends[0].engine.(*core).current.View())}, makeBlock(0))
roundChangeCertificate := sys.getRoundChangeCertificate(t, *(sys.backends[0].engine.(*core).current.View()), preparedCertificate)
return roundChangeCertificate
},
makeBlock(0),
Expand All @@ -288,7 +288,7 @@ func TestHandlePreprepare(t *testing.T) {
return sys
}(),
func(sys *testSystem) istanbul.RoundChangeCertificate {
roundChangeCertificate := sys.getRoundChangeCertificate(t, *(sys.backends[0].engine.(*core).currentView()), istanbul.EmptyPreparedCertificate())
roundChangeCertificate := sys.getRoundChangeCertificate(t, *(sys.backends[0].engine.(*core).current.View()), istanbul.EmptyPreparedCertificate())
return roundChangeCertificate
},
makeBlock(1),
Expand All @@ -305,7 +305,7 @@ OUTER:
v0 := test.system.backends[0]
r0 := v0.engine.(*core)

curView := r0.currentView()
curView := r0.current.View()

preprepareView := curView
if test.existingBlock {
Expand Down
Loading

0 comments on commit 6de2cdd

Please sign in to comment.