Skip to content

Commit

Permalink
fix(core/fetcher): resubscribe if consensus node goes offline (#4096)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs authored Feb 7, 2025
1 parent 482f452 commit b2e8967
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 0 deletions.
10 changes: 10 additions & 0 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
"github.com/tendermint/tendermint/types"
"google.golang.org/grpc"
"google.golang.org/grpc/status"

libhead "github.com/celestiaorg/go-header"
)
Expand Down Expand Up @@ -173,9 +174,12 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types

subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{})
if err != nil {
close(f.doneCh)
f.isListeningForBlocks.Store(false)
return nil, err
}

log.Debug("created a subscription. Start listening for new blocks...")
signedBlockCh := make(chan types.EventDataSignedBlock)
go func() {
defer close(f.doneCh)
Expand All @@ -189,6 +193,12 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types
resp, err := subscription.Recv()
if err != nil {
log.Errorw("fetcher: error receiving new height", "err", err.Error())
_, ok := status.FromError(err) // parsing the gRPC error
if ok {
// ok means that err contains a gRPC status error.
// move on another round of resubscribing.
return
}
continue
}
withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second)
Expand Down
130 changes: 130 additions & 0 deletions core/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package core

import (
"context"
"errors"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
"google.golang.org/grpc"
)

func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) {
Expand Down Expand Up @@ -40,3 +43,130 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) {
}
require.NoError(t, fetcher.Stop(ctx))
}

type mockAPIService struct {
coregrpc.UnimplementedBlockAPIServer

grpcServer *grpc.Server
fetcher *BlockFetcher
}

func (m *mockAPIService) SubscribeNewHeights(
_ *coregrpc.SubscribeNewHeightsRequest,
srv coregrpc.BlockAPI_SubscribeNewHeightsServer,
) error {
for i := 0; i < 20; i++ {
b, err := m.fetcher.GetBlock(context.Background(), int64(i))
if err != nil {
return err
}
err = srv.Send(&coregrpc.NewHeightEvent{Height: b.Header.Height, Hash: b.Header.Hash()})
if err != nil {
return err
}
time.Sleep(time.Second)
}
return nil
}

func (m *mockAPIService) BlockByHeight(
req *coregrpc.BlockByHeightRequest,
srv coregrpc.BlockAPI_BlockByHeightServer,
) error {
b, err := m.fetcher.client.BlockByHeight(context.Background(), &coregrpc.BlockByHeightRequest{Height: req.Height})
if err != nil {
return err
}
data, err := b.Recv()
if err != nil {
return err
}
err = srv.Send(data)
if err != nil {
return err
}
return nil
}

func (m *mockAPIService) Start() error {
listener, err := net.Listen("tcp", ":50051")
if err != nil {
return err
}

grpcServer := grpc.NewServer()
m.grpcServer = grpcServer
coregrpc.RegisterBlockAPIServer(grpcServer, m)
go func() {
err = grpcServer.Serve(listener)
if err != nil && !errors.Is(err, grpc.ErrServerStopped) {
panic(err)
}
}()
return nil
}

func (m *mockAPIService) Stop() error {
m.grpcServer.Stop()
return nil
}

func (m *mockAPIService) generateBlocksWithHeights(ctx context.Context, t *testing.T) {
cfg := DefaultTestConfig()
fetcher, cctx := createCoreFetcher(t, cfg)
m.fetcher = fetcher
generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)
require.NoError(t, fetcher.Stop(ctx))
}

// TestStart_SubscribeNewBlockEvent_Resubscription ensures that subscription will not stuck in case
// gRPC server was stopped.
func TestStart_SubscribeNewBlockEvent_Resubscription(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)
m := &mockAPIService{}
m.generateBlocksWithHeights(ctx, t)

require.NoError(t, m.Start())

client := newTestClient(t, "localhost", "50051")

fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)
// subscribe to block event to get blocks
newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)

select {
case newBlockFromChan := <-newBlockChan:
h := newBlockFromChan.Header.Height
_, err := fetcher.GetSignedBlock(ctx, h)
require.NoError(t, err)
case <-ctx.Done():
require.NoError(t, ctx.Err())
}

require.NoError(t, m.Stop())

// stopping the server sends an error with the status code
// to client, so the subscription loop will be finished.
// check that newBlockChan was closed
_, ok := <-newBlockChan
require.False(t, ok)

// start server and try to get a new subscription
require.NoError(t, m.Start())
newBlockChan, err = fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
select {
case newBlockFromChan := <-newBlockChan:
h := newBlockFromChan.Header.Height
_, err := fetcher.GetSignedBlock(ctx, h)
require.NoError(t, err)
case <-ctx.Done():
require.NoError(t, ctx.Err())
}
require.NoError(t, m.Stop())
require.NoError(t, m.fetcher.Stop(ctx))
require.NoError(t, fetcher.Stop(ctx))
}

0 comments on commit b2e8967

Please sign in to comment.