diff --git a/gno.land/pkg/integration/testing_node.go b/gno.land/pkg/integration/testing_node.go index b0aa9f7c2af..cc0262b8105 100644 --- a/gno.land/pkg/integration/testing_node.go +++ b/gno.land/pkg/integration/testing_node.go @@ -32,8 +32,8 @@ func TestingInMemoryNode(t TestingTS, logger *slog.Logger, config *gnoland.InMem require.NoError(t, err) select { - case <-gnoland.GetNodeReadiness(node): - case <-time.After(time.Second * 6): + case <-node.Ready(): + case <-time.After(time.Second * 10): require.FailNow(t, "timeout while waiting for the node to start") } diff --git a/tm2/pkg/bft/node/node.go b/tm2/pkg/bft/node/node.go index a6c67fa2485..57cc18b8ad7 100644 --- a/tm2/pkg/bft/node/node.go +++ b/tm2/pkg/bft/node/node.go @@ -9,6 +9,7 @@ import ( "net/http" _ "net/http/pprof" //nolint:gosec "strings" + "sync" "time" "golang.org/x/exp/slog" @@ -168,6 +169,7 @@ type Node struct { rpcListeners []net.Listener // rpc servers txEventStore eventstore.TxEventStore eventStoreService *eventstore.Service + firstBlockSignal <-chan struct{} } func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { @@ -439,6 +441,20 @@ func NewNode(config *cfg.Config, // but before it indexed the txs, or, endblocker panicked) evsw := events.NewEventSwitch() + // Signal readiness when receiving the first block. + const readinessListenerID = "first_block_listener" + + cFirstBlock := make(chan struct{}) + var once sync.Once + evsw.AddListener(readinessListenerID, func(ev events.Event) { + if _, ok := ev.(types.EventNewBlock); ok { + once.Do(func() { + close(cFirstBlock) + evsw.RemoveListener(readinessListenerID) + }) + } + }) + // Transaction event storing eventStoreService, txEventStore, err := createAndStartEventStoreService(config, evsw, logger) if err != nil { @@ -554,6 +570,7 @@ func NewNode(config *cfg.Config, proxyApp: proxyApp, txEventStore: txEventStore, eventStoreService: eventStoreService, + firstBlockSignal: cFirstBlock, } node.BaseService = *service.NewBaseService(logger, "Node", node) @@ -653,6 +670,11 @@ func (n *Node) OnStop() { } } +// Ready signals that the node is ready by returning a blocking channel. This channel is closed when the node receives its first block. +func (n *Node) Ready() <-chan struct{} { + return n.firstBlockSignal +} + // ConfigureRPC sets all variables in rpccore so they will serve // rpc calls from this node func (n *Node) ConfigureRPC() { diff --git a/tm2/pkg/bft/node/node_test.go b/tm2/pkg/bft/node/node_test.go index d182b7fb0d5..3057a41b5f3 100644 --- a/tm2/pkg/bft/node/node_test.go +++ b/tm2/pkg/bft/node/node_test.go @@ -108,6 +108,39 @@ func TestNodeDelayedStart(t *testing.T) { assert.Equal(t, true, startTime.After(n.GenesisDoc().GenesisTime)) } +func TestNodeReady(t *testing.T) { + config := cfg.ResetTestRoot("node_node_test") + defer os.RemoveAll(config.RootDir) + + // Create & start node + n, err := DefaultNewNode(config, log.NewTestingLogger(t)) + require.NoError(t, err) + + // Assert that blockstore has zero block before waiting for the first block + require.Equal(t, int64(0), n.BlockStore().Height()) + + // Assert that first block signal is not alreay received by calling Ready + select { + case <-n.Ready(): + require.FailNow(t, "first block signal should not be close before starting the node") + default: // ok + } + + err = n.Start() + require.NoError(t, err) + defer n.Stop() + + // Wait until the node is ready or timeout + select { + case <-time.After(time.Second): + require.FailNow(t, "timeout while waiting for first block signal") + case <-n.Ready(): // ready + } + + // Check that blockstore have at last one block + require.GreaterOrEqual(t, n.BlockStore().Height(), int64(1)) +} + func TestNodeSetAppVersion(t *testing.T) { config := cfg.ResetTestRoot("node_app_version_test") defer os.RemoveAll(config.RootDir)