diff --git a/tm2/pkg/bft/consensus/common_test.go b/tm2/pkg/bft/consensus/common_test.go index ba19881aace..db54359f76e 100644 --- a/tm2/pkg/bft/consensus/common_test.go +++ b/tm2/pkg/bft/consensus/common_test.go @@ -6,6 +6,7 @@ import ( "os" "path" "path/filepath" + "reflect" "sort" "sync" "testing" @@ -245,8 +246,34 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo validatePrecommit(t, cs, thisRound, lockRound, privVal, votedBlockHash, lockedBlockHash) } +func bufferedEventChannel(in <-chan events.Event, size int) (out chan events.Event) { + out = make(chan events.Event, size) + go func() { + defer close(out) + for evt := range in { + out <- evt + } + }() + + return out +} + +func subscribe(evsw events.EventSwitch, protoevent events.Event) <-chan events.Event { + name := reflect.ValueOf(protoevent).Type().Name() + listenerID := fmt.Sprintf("%s-%s", testSubscriber, name) + ch := events.SubscribeToEvent(evsw, listenerID, protoevent) + + // Similar to subscribeToVoter, this modification introduces + // a buffered channel to ensures that events are consumed + // asynchronously, thereby avoiding the deadlock situation described in + // #1320 where the eventSwitch.FireEvent method was blocked. + bch := bufferedEventChannel(ch, 16) + + return bch +} + func subscribeToVoter(cs *ConsensusState, addr crypto.Address) <-chan events.Event { - return events.SubscribeFiltered(cs.evsw, testSubscriber, func(event events.Event) bool { + ch := events.SubscribeFiltered(cs.evsw, testSubscriber, func(event events.Event) bool { if vote, ok := event.(types.EventVote); ok { if vote.Vote.ValidatorAddress == addr { return true @@ -254,6 +281,15 @@ func subscribeToVoter(cs *ConsensusState, addr crypto.Address) <-chan events.Eve } return false }) + + // This modification addresses the deadlock issue outlined in issue + // #1320. By creating a buffered channel, we ensure that events are + // consumed even if the main thread is blocked. This prevents the + // deadlock that occurred when eventSwitch.FireEvent was blocked due to + // no available consumers for the event. + bch := bufferedEventChannel(ch, 16) + + return bch } // ------------------------------------------------------------------------------- diff --git a/tm2/pkg/bft/consensus/state_test.go b/tm2/pkg/bft/consensus/state_test.go index 35877837ab3..e31817cf9cc 100644 --- a/tm2/pkg/bft/consensus/state_test.go +++ b/tm2/pkg/bft/consensus/state_test.go @@ -11,7 +11,6 @@ import ( cstypes "github.com/gnolang/gno/tm2/pkg/bft/consensus/types" "github.com/gnolang/gno/tm2/pkg/bft/types" - "github.com/gnolang/gno/tm2/pkg/events" p2pmock "github.com/gnolang/gno/tm2/pkg/p2p/mock" "github.com/gnolang/gno/tm2/pkg/random" "github.com/gnolang/gno/tm2/pkg/testutils" @@ -1778,7 +1777,3 @@ func TestStateOutputVoteStats(t *testing.T) { case <-time.After(50 * time.Millisecond): } } - -func subscribe(evsw events.EventSwitch, protoevent events.Event) <-chan events.Event { - return events.SubscribeToEvent(evsw, testSubscriber, protoevent) -}