Skip to content

Commit

Permalink
refactor(core): unify grpc clients (#3999)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Jan 20, 2025
1 parent 4885326 commit 5cdb112
Show file tree
Hide file tree
Showing 22 changed files with 228 additions and 370 deletions.
67 changes: 0 additions & 67 deletions core/client.go

This file was deleted.

5 changes: 2 additions & 3 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestExchange_StoreHistoricIfArchival(t *testing.T) {

// initialize store with genesis block
genHeight := int64(1)
genBlock, err := fetcher.GetBlock(ctx, &genHeight)
genBlock, err := fetcher.GetBlock(ctx, genHeight)
require.NoError(t, err)
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)
Expand Down Expand Up @@ -170,8 +170,7 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn
require.NoError(t, err)
host, port, err := net.SplitHostPort(cctx.GRPCClient.Target())
require.NoError(t, err)
client := NewClient(host, port)
require.NoError(t, client.Start())
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)
return fetcher, cctx
Expand Down
42 changes: 4 additions & 38 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package core

import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"
Expand All @@ -13,14 +12,13 @@ import (
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
"github.com/tendermint/tendermint/types"
"google.golang.org/grpc"

libhead "github.com/celestiaorg/go-header"
)

const newBlockSubscriber = "NewBlock/Events"

var ErrClientNotRunning = errors.New("gRPC connection to core node is not running")

type SignedBlock struct {
Header *types.Header `json:"header"`
Commit *types.Commit `json:"commit"`
Expand All @@ -34,17 +32,17 @@ var (
)

type BlockFetcher struct {
client *Client
client coregrpc.BlockAPIClient

doneCh chan struct{}
cancel context.CancelFunc
isListeningForBlocks atomic.Bool
}

// NewBlockFetcher returns a new `BlockFetcher`.
func NewBlockFetcher(client *Client) (*BlockFetcher, error) {
func NewBlockFetcher(conn *grpc.ClientConn) (*BlockFetcher, error) {
return &BlockFetcher{
client: client,
client: coregrpc.NewBlockAPIClient(conn),
}, nil
}

Expand All @@ -62,10 +60,6 @@ func (f *BlockFetcher) Stop(ctx context.Context) error {

// GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet.
func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.Commit, *types.ValidatorSet, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, nil, ErrClientNotRunning
}
commit, err := f.Commit(ctx, height)
if err != nil {
return nil, nil, fmt.Errorf("core/fetcher: getting commit at height %d: %w", height, err)
Expand All @@ -87,10 +81,6 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.C
// GetBlock queries Core for a `Block` at the given height.
// if the height is nil, use the latest height
func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height})
if err != nil {
return nil, err
Expand All @@ -103,10 +93,6 @@ func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock
}

func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*types.Block, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
if hash == nil {
return nil, fmt.Errorf("cannot get block with nil hash")
}
Expand All @@ -125,10 +111,6 @@ func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*
// GetSignedBlock queries Core for a `Block` at the given height.
// if the height is nil, use the latest height.
func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*SignedBlock, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height})
if err != nil {
return nil, err
Expand All @@ -140,10 +122,6 @@ func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*Signe
// the given height.
// If the height is nil, use the latest height.
func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
res, err := f.client.Commit(ctx, &coregrpc.CommitRequest{Height: height})
if err != nil {
return nil, err
Expand All @@ -165,10 +143,6 @@ func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit,
// block at the given height.
// If the height is nil, use the latest height.
func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.ValidatorSet, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
res, err := f.client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: height})
if err != nil {
return nil, err
Expand All @@ -189,10 +163,6 @@ func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.V
// SubscribeNewBlockEvent subscribes to new block events from Core, returning
// a new block event channel on success.
func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
if f.isListeningForBlocks.Load() {
return nil, fmt.Errorf("already subscribed to new blocks")
}
Expand Down Expand Up @@ -252,10 +222,6 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types
// syncing, and false for already caught up. It can also return an error
// in the case of a failed status request.
func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return false, ErrClientNotRunning
}
resp, err := f.client.Status(ctx, &coregrpc.StatusRequest{})
if err != nil {
return false, err
Expand Down
3 changes: 1 addition & 2 deletions core/fetcher_no_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ func TestBlockFetcherHeaderValues(t *testing.T) {
node := StartTestNode(t)
host, port, err := net.SplitHostPort(node.GRPCClient.Target())
require.NoError(t, err)
client := NewClient(host, port)
require.NoError(t, client.Start())
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)

Expand Down
4 changes: 1 addition & 3 deletions core/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) {

host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target())
require.NoError(t, err)
client := NewClient(host, port)
require.NoError(t, client.Start())
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)

// generate some blocks
newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
Expand Down
4 changes: 1 addition & 3 deletions core/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) {

host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target())
require.NoError(t, err)
client := NewClient(host, port)
require.NoError(t, client.Start())
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)

sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
<-sub
Expand Down
19 changes: 19 additions & 0 deletions core/testing.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package core

import (
"context"
"net"
"testing"
"time"

"github.com/stretchr/testify/require"
tmrand "github.com/tendermint/tendermint/libs/rand"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"

"github.com/celestiaorg/celestia-app/v3/test/util/genesis"
"github.com/celestiaorg/celestia-app/v3/test/util/testnode"
Expand Down Expand Up @@ -60,3 +66,16 @@ func generateRandomAccounts(n int) []string {
}
return accounts
}

func newTestClient(t *testing.T, ip, port string) *grpc.ClientConn {
t.Helper()
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
endpoint := net.JoinHostPort(ip, port)
client, err := grpc.NewClient(endpoint, opt)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
t.Cleanup(cancel)
ready := client.WaitForStateChange(ctx, connectivity.Ready)
require.True(t, ready)
return client
}
22 changes: 11 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c
github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b
github.com/benbjohnson/clock v1.3.5
github.com/celestiaorg/celestia-app/v3 v3.2.0
github.com/celestiaorg/celestia-app/v3 v3.3.0-rc0
github.com/celestiaorg/go-fraud v0.2.1
github.com/celestiaorg/go-header v0.6.3
github.com/celestiaorg/go-libp2p-messenger v0.2.0
Expand Down Expand Up @@ -55,7 +55,7 @@ require (
github.com/rollkit/go-da v0.8.0
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
github.com/tendermint/tendermint v0.34.29
go.opentelemetry.io/contrib/instrumentation/runtime v0.45.0
go.opentelemetry.io/otel v1.31.0
Expand All @@ -69,10 +69,10 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1
go.uber.org/fx v1.23.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.29.0
golang.org/x/crypto v0.31.0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
golang.org/x/sync v0.9.0
golang.org/x/text v0.20.0
golang.org/x/sync v0.10.0
golang.org/x/text v0.21.0
google.golang.org/grpc v1.68.0
google.golang.org/protobuf v1.35.1
)
Expand Down Expand Up @@ -128,7 +128,7 @@ require (
github.com/cosmos/iavl v0.19.6 // indirect
github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v6 v6.1.2 // indirect
github.com/cosmos/ibc-go/v6 v6.3.0 // indirect
github.com/cosmos/ledger-cosmos-go v0.13.2 // indirect
github.com/cosmos/ledger-cosmos-go v0.14.0 // indirect
github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c // indirect
github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect
github.com/cskr/pubsub v1.0.2 // indirect
Expand Down Expand Up @@ -330,10 +330,10 @@ require (
go.uber.org/mock v0.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/term v0.26.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.26.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
Expand All @@ -352,12 +352,12 @@ require (
)

replace (
github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.25.1-sdk-v0.46.16
github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.27.0-sdk-v0.46.16
github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
// broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.44.1-tm-v0.34.35
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.45.0-tm-v0.34.35
)

replace github.com/ipfs/boxo => github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b
Loading

0 comments on commit 5cdb112

Please sign in to comment.