From 5cdb1121c89854643165171309fee693ea1f02bf Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Fri, 13 Dec 2024 17:17:26 +0200 Subject: [PATCH] refactor(core): unify grpc clients (#3999) --- core/client.go | 67 ----------------- core/exchange_test.go | 5 +- core/fetcher.go | 42 ++--------- core/fetcher_no_race_test.go | 3 +- core/fetcher_test.go | 4 +- core/header_test.go | 4 +- core/testing.go | 19 +++++ go.mod | 22 +++--- go.sum | 43 +++++------ nodebuilder/core/constructors.go | 112 ++++++++++++++++++++++++++++- nodebuilder/core/module.go | 10 +-- nodebuilder/core/opts.go | 8 +-- nodebuilder/core/tls.go | 44 ------------ nodebuilder/module.go | 4 +- nodebuilder/node_bridge_test.go | 13 ++-- nodebuilder/state/core.go | 20 +----- nodebuilder/state/module.go | 2 - nodebuilder/testing.go | 13 ++-- nodebuilder/tests/swamp/swamp.go | 24 ++++--- state/core_access.go | 119 ++----------------------------- state/core_access_test.go | 12 ++-- state/integration_test.go | 8 ++- 22 files changed, 228 insertions(+), 370 deletions(-) delete mode 100644 core/client.go delete mode 100644 nodebuilder/core/tls.go diff --git a/core/client.go b/core/client.go deleted file mode 100644 index 2bef6dba0c..0000000000 --- a/core/client.go +++ /dev/null @@ -1,67 +0,0 @@ -package core - -import ( - "fmt" - - coregrpc "github.com/tendermint/tendermint/rpc/grpc" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -// Client is a core gRPC client. -type Client struct { - coregrpc.BlockAPIClient - host, port string - conn *grpc.ClientConn -} - -// NewClient creates a new Client that communicates with a remote Core endpoint over gRPC. -// The connection is not started when creating the client. -// Use the Start method to start the connection. -func NewClient(host, port string) *Client { - return &Client{ - host: host, - port: port, - } -} - -// Start created the Client's gRPC connection with optional dial options. -// If the connection is already started, it does nothing. -func (c *Client) Start(opts ...grpc.DialOption) error { - if c.IsRunning() { - return nil - } - if len(opts) == 0 { - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) - } - conn, err := grpc.NewClient( - fmt.Sprintf("%s:%s", c.host, c.port), - opts..., - ) - if err != nil { - return err - } - c.conn = conn - - c.BlockAPIClient = coregrpc.NewBlockAPIClient(conn) - return nil -} - -// IsRunning checks if the client's connection is established and ready for use. -// It returns true if the connection is active, false otherwise. -func (c *Client) IsRunning() bool { - return c.conn != nil && c.BlockAPIClient != nil -} - -// Stop terminates the Client's gRPC connection and releases all related resources. -// If the connection is already stopped, it does nothing. -func (c *Client) Stop() error { - if !c.IsRunning() { - return nil - } - defer func() { - c.conn = nil - c.BlockAPIClient = nil - }() - return c.conn.Close() -} diff --git a/core/exchange_test.go b/core/exchange_test.go index 5ed165b857..54b07ec914 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -138,7 +138,7 @@ func TestExchange_StoreHistoricIfArchival(t *testing.T) { // initialize store with genesis block genHeight := int64(1) - genBlock, err := fetcher.GetBlock(ctx, &genHeight) + genBlock, err := fetcher.GetBlock(ctx, genHeight) require.NoError(t, err) genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) require.NoError(t, err) @@ -170,8 +170,7 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn require.NoError(t, err) host, port, err := net.SplitHostPort(cctx.GRPCClient.Target()) require.NoError(t, err) - client := NewClient(host, port) - require.NoError(t, client.Start()) + client := newTestClient(t, host, port) fetcher, err := NewBlockFetcher(client) require.NoError(t, err) return fetcher, cctx diff --git a/core/fetcher.go b/core/fetcher.go index 19f7c12868..6f049cc7ac 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -2,7 +2,6 @@ package core import ( "context" - "errors" "fmt" "io" "sync/atomic" @@ -13,14 +12,13 @@ import ( tmproto "github.com/tendermint/tendermint/proto/tendermint/types" coregrpc "github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/types" + "google.golang.org/grpc" libhead "github.com/celestiaorg/go-header" ) const newBlockSubscriber = "NewBlock/Events" -var ErrClientNotRunning = errors.New("gRPC connection to core node is not running") - type SignedBlock struct { Header *types.Header `json:"header"` Commit *types.Commit `json:"commit"` @@ -34,7 +32,7 @@ var ( ) type BlockFetcher struct { - client *Client + client coregrpc.BlockAPIClient doneCh chan struct{} cancel context.CancelFunc @@ -42,9 +40,9 @@ type BlockFetcher struct { } // NewBlockFetcher returns a new `BlockFetcher`. -func NewBlockFetcher(client *Client) (*BlockFetcher, error) { +func NewBlockFetcher(conn *grpc.ClientConn) (*BlockFetcher, error) { return &BlockFetcher{ - client: client, + client: coregrpc.NewBlockAPIClient(conn), }, nil } @@ -62,10 +60,6 @@ func (f *BlockFetcher) Stop(ctx context.Context) error { // GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet. func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.Commit, *types.ValidatorSet, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, nil, ErrClientNotRunning - } commit, err := f.Commit(ctx, height) if err != nil { return nil, nil, fmt.Errorf("core/fetcher: getting commit at height %d: %w", height, err) @@ -87,10 +81,6 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.C // GetBlock queries Core for a `Block` at the given height. // if the height is nil, use the latest height func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) if err != nil { return nil, err @@ -103,10 +93,6 @@ func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock } func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*types.Block, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } if hash == nil { return nil, fmt.Errorf("cannot get block with nil hash") } @@ -125,10 +111,6 @@ func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (* // GetSignedBlock queries Core for a `Block` at the given height. // if the height is nil, use the latest height. func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*SignedBlock, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) if err != nil { return nil, err @@ -140,10 +122,6 @@ func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*Signe // the given height. // If the height is nil, use the latest height. func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } res, err := f.client.Commit(ctx, &coregrpc.CommitRequest{Height: height}) if err != nil { return nil, err @@ -165,10 +143,6 @@ func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, // block at the given height. // If the height is nil, use the latest height. func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.ValidatorSet, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } res, err := f.client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: height}) if err != nil { return nil, err @@ -189,10 +163,6 @@ func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.V // SubscribeNewBlockEvent subscribes to new block events from Core, returning // a new block event channel on success. func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } if f.isListeningForBlocks.Load() { return nil, fmt.Errorf("already subscribed to new blocks") } @@ -252,10 +222,6 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types // syncing, and false for already caught up. It can also return an error // in the case of a failed status request. func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return false, ErrClientNotRunning - } resp, err := f.client.Status(ctx, &coregrpc.StatusRequest{}) if err != nil { return false, err diff --git a/core/fetcher_no_race_test.go b/core/fetcher_no_race_test.go index 2f34e4bc05..d184fb8b91 100644 --- a/core/fetcher_no_race_test.go +++ b/core/fetcher_no_race_test.go @@ -22,8 +22,7 @@ func TestBlockFetcherHeaderValues(t *testing.T) { node := StartTestNode(t) host, port, err := net.SplitHostPort(node.GRPCClient.Target()) require.NoError(t, err) - client := NewClient(host, port) - require.NoError(t, client.Start()) + client := newTestClient(t, host, port) fetcher, err := NewBlockFetcher(client) require.NoError(t, err) diff --git a/core/fetcher_test.go b/core/fetcher_test.go index 14eeab0bd7..8d7659494d 100644 --- a/core/fetcher_test.go +++ b/core/fetcher_test.go @@ -16,11 +16,9 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) require.NoError(t, err) - client := NewClient(host, port) - require.NoError(t, client.Start()) + client := newTestClient(t, host, port) fetcher, err := NewBlockFetcher(client) require.NoError(t, err) - // generate some blocks newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) diff --git a/core/header_test.go b/core/header_test.go index d3c8ab116a..dcc5dba9e2 100644 --- a/core/header_test.go +++ b/core/header_test.go @@ -23,11 +23,9 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) { host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) require.NoError(t, err) - client := NewClient(host, port) - require.NoError(t, client.Start()) + client := newTestClient(t, host, port) fetcher, err := NewBlockFetcher(client) require.NoError(t, err) - sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) <-sub diff --git a/core/testing.go b/core/testing.go index d4b5f6334b..586bf57f83 100644 --- a/core/testing.go +++ b/core/testing.go @@ -1,10 +1,16 @@ package core import ( + "context" + "net" "testing" "time" + "github.com/stretchr/testify/require" tmrand "github.com/tendermint/tendermint/libs/rand" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-app/v3/test/util/genesis" "github.com/celestiaorg/celestia-app/v3/test/util/testnode" @@ -60,3 +66,16 @@ func generateRandomAccounts(n int) []string { } return accounts } + +func newTestClient(t *testing.T, ip, port string) *grpc.ClientConn { + t.Helper() + opt := grpc.WithTransportCredentials(insecure.NewCredentials()) + endpoint := net.JoinHostPort(ip, port) + client, err := grpc.NewClient(endpoint, opt) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + t.Cleanup(cancel) + ready := client.WaitForStateChange(ctx, connectivity.Ready) + require.True(t, ready) + return client +} diff --git a/go.mod b/go.mod index 3916a18bed..495934f61b 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b github.com/benbjohnson/clock v1.3.5 - github.com/celestiaorg/celestia-app/v3 v3.2.0 + github.com/celestiaorg/celestia-app/v3 v3.3.0-rc0 github.com/celestiaorg/go-fraud v0.2.1 github.com/celestiaorg/go-header v0.6.3 github.com/celestiaorg/go-libp2p-messenger v0.2.0 @@ -55,7 +55,7 @@ require ( github.com/rollkit/go-da v0.8.0 github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/tendermint/tendermint v0.34.29 go.opentelemetry.io/contrib/instrumentation/runtime v0.45.0 go.opentelemetry.io/otel v1.31.0 @@ -69,10 +69,10 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 go.uber.org/fx v1.23.0 go.uber.org/zap v1.27.0 - golang.org/x/crypto v0.29.0 + golang.org/x/crypto v0.31.0 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c - golang.org/x/sync v0.9.0 - golang.org/x/text v0.20.0 + golang.org/x/sync v0.10.0 + golang.org/x/text v0.21.0 google.golang.org/grpc v1.68.0 google.golang.org/protobuf v1.35.1 ) @@ -128,7 +128,7 @@ require ( github.com/cosmos/iavl v0.19.6 // indirect github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v6 v6.1.2 // indirect github.com/cosmos/ibc-go/v6 v6.3.0 // indirect - github.com/cosmos/ledger-cosmos-go v0.13.2 // indirect + github.com/cosmos/ledger-cosmos-go v0.14.0 // indirect github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c // indirect github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect github.com/cskr/pubsub v1.0.2 // indirect @@ -330,10 +330,10 @@ require ( go.uber.org/mock v0.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/net v0.31.0 // indirect + golang.org/x/net v0.33.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sys v0.27.0 // indirect - golang.org/x/term v0.26.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/term v0.27.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.26.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect @@ -352,12 +352,12 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.25.1-sdk-v0.46.16 + github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.27.0-sdk-v0.46.16 github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 // broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.44.1-tm-v0.34.35 + github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.45.0-tm-v0.34.35 ) replace github.com/ipfs/boxo => github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b diff --git a/go.sum b/go.sum index 15496fee29..5715acbcd2 100644 --- a/go.sum +++ b/go.sum @@ -347,12 +347,12 @@ github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZI github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ= github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b h1:M9X7s1WJ/7Ju84ZUbO/6/8XlODkFsj/ln85AE0F6pj8= github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b/go.mod h1:OpUrJtGmZZktUqJvPOtmP8wSfEFcdF/55d3PNCcYLwc= -github.com/celestiaorg/celestia-app/v3 v3.2.0 h1:s6Yag6hLvUQGwS4TXIwtbgmFT1uDvvjgM8OGPUK5de4= -github.com/celestiaorg/celestia-app/v3 v3.2.0/go.mod h1:heMutO+/UMfH0RXCqosZBrccLWz9krwgaoQmcEEUb4A= -github.com/celestiaorg/celestia-core v1.44.1-tm-v0.34.35 h1:ZPbLW+E8galFjKb+Wl4ZGKS8IUNgzWTrRSBfs1izPcQ= -github.com/celestiaorg/celestia-core v1.44.1-tm-v0.34.35/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= -github.com/celestiaorg/cosmos-sdk v1.25.1-sdk-v0.46.16 h1:W0c1Ib24jGZ3UDnzupoD8PmscnOU39qhVV5qJ69yBvQ= -github.com/celestiaorg/cosmos-sdk v1.25.1-sdk-v0.46.16/go.mod h1:07Z8HJqS8Rw4XlZ+ok3D3NM/X/in8mvcGLvl0Zb5wrA= +github.com/celestiaorg/celestia-app/v3 v3.3.0-rc0 h1:YKhBIUAt6CZjxZijql73mBszuddJyVSTyQd/3ZBxxcc= +github.com/celestiaorg/celestia-app/v3 v3.3.0-rc0/go.mod h1:MKhiQgATDdLouzC5KvXDAnDpEgIXyD0MNiq0ChrWFco= +github.com/celestiaorg/celestia-core v1.45.0-tm-v0.34.35 h1:T21AhezjcByAlWDHmiVbpg743Uqk/dqBzJkQsAnbQf8= +github.com/celestiaorg/celestia-core v1.45.0-tm-v0.34.35/go.mod h1:fQ46s1hYFTGFBsHsuGsbxDZ720ZPQow5Iyqw+yErZSo= +github.com/celestiaorg/cosmos-sdk v1.27.0-sdk-v0.46.16 h1:qxWiGrDEcg4FzVTpIXU/v3wjP7q1Lz4AMhSBBRABInU= +github.com/celestiaorg/cosmos-sdk v1.27.0-sdk-v0.46.16/go.mod h1:W30mNt3+2l516HVR8Gt9+Gf4qOrWC9/x18MTEx2GljE= github.com/celestiaorg/go-fraud v0.2.1 h1:oYhxI0gM/EpGRgbVQdRI/LSlqyT65g/WhQGSVGfx09w= github.com/celestiaorg/go-fraud v0.2.1/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc= github.com/celestiaorg/go-header v0.6.3 h1:VI+fsNxFLeUS7cNn0LgHP6Db66uslnKp/fgMg5nxqHg= @@ -481,8 +481,8 @@ github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v6 v6.1.2 h1:Hz4 github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v6 v6.1.2/go.mod h1:Jo934o/sW7fNxuOa/TjCalSalz+1Fd649eLyANaJx8g= github.com/cosmos/ibc-go/v6 v6.3.0 h1:2EkkqDEd9hTQvzB/BsPhYZsu7T/dzAVA8+VD2UuJLSQ= github.com/cosmos/ibc-go/v6 v6.3.0/go.mod h1:Dm14j9s094bGyCEE8W4fD+2t8IneHv+cz+80Mvwjr1w= -github.com/cosmos/ledger-cosmos-go v0.13.2 h1:aY0KZSmUwNKbBm9OvbIjvf7Ozz2YzzpAbgvN2C8x2T0= -github.com/cosmos/ledger-cosmos-go v0.13.2/go.mod h1:HENcEP+VtahZFw38HZ3+LS3Iv5XV6svsnkk9vdJtLr8= +github.com/cosmos/ledger-cosmos-go v0.14.0 h1:WfCHricT3rPbkPSVKRH+L4fQGKYHuGOK9Edpel8TYpE= +github.com/cosmos/ledger-cosmos-go v0.14.0/go.mod h1:E07xCWSBl3mTGofZ2QnL4cIUzMbbGVyik84QYKbX3RA= github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -1794,8 +1794,9 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/supranational/blst v0.3.13 h1:AYeSxdOMacwu7FBmpfloBz5pbFXDmJL33RuwnKtmTjk= @@ -2013,8 +2014,8 @@ golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= -golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= -golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2146,8 +2147,8 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= -golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -2193,8 +2194,8 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= -golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -2324,8 +2325,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= -golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -2335,8 +2336,8 @@ golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= -golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU= -golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2351,8 +2352,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= -golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/nodebuilder/core/constructors.go b/nodebuilder/core/constructors.go index ae598a6029..e4e7593e3f 100644 --- a/nodebuilder/core/constructors.go +++ b/nodebuilder/core/constructors.go @@ -1,9 +1,115 @@ package core import ( - "github.com/celestiaorg/celestia-node/core" + "context" + "crypto/tls" + "encoding/json" + "errors" + "net" + "os" + "path/filepath" + + "go.uber.org/fx" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + + "github.com/celestiaorg/celestia-node/libs/utils" ) -func remote(cfg Config) *core.Client { - return core.NewClient(cfg.IP, cfg.Port) +const xtokenFileName = "xtoken.json" + +func grpcClient(lc fx.Lifecycle, cfg Config) (*grpc.ClientConn, error) { + var opts []grpc.DialOption + if cfg.TLSEnabled { + opts = append(opts, grpc.WithTransportCredentials( + credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12})), + ) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + if cfg.XTokenPath != "" { + xToken, err := parseTokenPath(cfg.XTokenPath) + if err != nil { + return nil, err + } + opts = append(opts, grpc.WithUnaryInterceptor(authInterceptor(xToken))) + opts = append(opts, grpc.WithStreamInterceptor(authStreamInterceptor(xToken))) + } + + endpoint := net.JoinHostPort(cfg.IP, cfg.Port) + conn, err := grpc.NewClient(endpoint, opts...) + if err != nil { + return nil, err + } + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + conn.Connect() + if !conn.WaitForStateChange(ctx, connectivity.Ready) { + return errors.New("couldn't connect to core endpoint") + } + return nil + }, + OnStop: func(context.Context) error { + return conn.Close() + }, + }) + return conn, nil +} + +func authInterceptor(xtoken string) grpc.UnaryClientInterceptor { + return func( + ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + ctx = metadata.AppendToOutgoingContext(ctx, "x-token", xtoken) + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +func authStreamInterceptor(xtoken string) grpc.StreamClientInterceptor { + return func( + ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + ctx = metadata.AppendToOutgoingContext(ctx, "x-token", xtoken) + return streamer(ctx, desc, cc, method, opts...) + } +} + +// parseTokenPath retrieves the authentication token from a JSON file at the specified path. +func parseTokenPath(xtokenPath string) (string, error) { + xtokenPath = filepath.Join(xtokenPath, xtokenFileName) + exist := utils.Exists(xtokenPath) + if !exist { + return "", os.ErrNotExist + } + + token, err := os.ReadFile(xtokenPath) + if err != nil { + return "", err + } + + auth := struct { + Token string `json:"x-token"` + }{} + + err = json.Unmarshal(token, &auth) + if err != nil { + return "", err + } + if auth.Token == "" { + return "", errors.New("x-token is empty. Please setup a token or cleanup xtokenPath") + } + return auth.Token, nil } diff --git a/nodebuilder/core/module.go b/nodebuilder/core/module.go index 61a4e3468c..a81365659a 100644 --- a/nodebuilder/core/module.go +++ b/nodebuilder/core/module.go @@ -25,6 +25,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option baseComponents := fx.Options( fx.Supply(*cfg), fx.Error(cfgErr), + fx.Provide(grpcClient), fx.Options(options...), ) @@ -74,15 +75,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option return listener.Stop(ctx) }), )), - fx.Provide(fx.Annotate( - remote, - fx.OnStart(func(_ context.Context, client *core.Client) error { - return client.Start() - }), - fx.OnStop(func(_ context.Context, client *core.Client) error { - return client.Stop() - }), - )), ) default: panic("invalid node type") diff --git a/nodebuilder/core/opts.go b/nodebuilder/core/opts.go index 26b25b4541..5de789daa9 100644 --- a/nodebuilder/core/opts.go +++ b/nodebuilder/core/opts.go @@ -2,15 +2,15 @@ package core import ( "go.uber.org/fx" + "google.golang.org/grpc" - "github.com/celestiaorg/celestia-node/core" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/libs/fxutil" ) -// WithClient sets a custom client for core process -func WithClient(client *core.Client) fx.Option { - return fxutil.ReplaceAs(client, new(core.Client)) +// WithConnection sets a custom client for core process +func WithConnection(conn *grpc.ClientConn) fx.Option { + return fxutil.ReplaceAs(conn, new(grpc.ClientConn)) } // WithHeaderConstructFn sets custom func that creates extended header diff --git a/nodebuilder/core/tls.go b/nodebuilder/core/tls.go deleted file mode 100644 index f280d0cfd0..0000000000 --- a/nodebuilder/core/tls.go +++ /dev/null @@ -1,44 +0,0 @@ -package core - -import ( - "crypto/tls" - "encoding/json" - "errors" - "os" - "path/filepath" - - "github.com/celestiaorg/celestia-node/libs/utils" -) - -const xtokenFileName = "xtoken.json" - -func EmptyTLSConfig() *tls.Config { - return &tls.Config{MinVersion: tls.VersionTLS12} -} - -// XToken retrieves the authentication token from a JSON file at the specified path. -func XToken(xtokenPath string) (string, error) { - xtokenPath = filepath.Join(xtokenPath, xtokenFileName) - exist := utils.Exists(xtokenPath) - if !exist { - return "", os.ErrNotExist - } - - token, err := os.ReadFile(xtokenPath) - if err != nil { - return "", err - } - - auth := struct { - Token string `json:"x-token"` - }{} - - err = json.Unmarshal(token, &auth) - if err != nil { - return "", err - } - if auth.Token == "" { - return "", errors.New("x-token is empty. Please setup a token or cleanup xtokenPath") - } - return auth.Token, nil -} diff --git a/nodebuilder/module.go b/nodebuilder/module.go index 5a774b8b9b..43de56eedd 100644 --- a/nodebuilder/module.go +++ b/nodebuilder/module.go @@ -42,14 +42,14 @@ func ConstructModule(tp node.Type, network p2p.Network, cfg *Config, store Store fx.Supply(store.Config), fx.Provide(store.Datastore), fx.Provide(store.Keystore), + core.ConstructModule(tp, &cfg.Core), fx.Supply(node.StorePath(store.Path())), // modules provided by the node p2p.ConstructModule(tp, &cfg.P2P), - state.ConstructModule(tp, &cfg.State, &cfg.Core), modhead.ConstructModule[*header.ExtendedHeader](tp, &cfg.Header), share.ConstructModule(tp, &cfg.Share), gateway.ConstructModule(tp, &cfg.Gateway), - core.ConstructModule(tp, &cfg.Core), + state.ConstructModule(tp, &cfg.State, &cfg.Core), das.ConstructModule(tp, &cfg.DASer), fraud.ConstructModule(tp), blob.ConstructModule(), diff --git a/nodebuilder/node_bridge_test.go b/nodebuilder/node_bridge_test.go index a4cac93c99..5647a6afa1 100644 --- a/nodebuilder/node_bridge_test.go +++ b/nodebuilder/node_bridge_test.go @@ -6,6 +6,8 @@ import ( "testing" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-node/core" coremodule "github.com/celestiaorg/celestia-node/nodebuilder/core" @@ -20,11 +22,14 @@ func TestBridge_WithMockedCoreClient(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + _, _, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) require.NoError(t, err) - client := core.NewClient(host, port) - require.NoError(t, client.Start()) - node, err := New(node.Bridge, p2p.Private, repo, coremodule.WithClient(client)) + con, err := grpc.NewClient( + core.StartTestNode(t).GRPCClient.Target(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + node, err := New(node.Bridge, p2p.Private, repo, coremodule.WithConnection(con)) require.NoError(t, err) require.NotNil(t, node) err = node.Start(ctx) diff --git a/nodebuilder/state/core.go b/nodebuilder/state/core.go index c55f0da01b..d66da88c44 100644 --- a/nodebuilder/state/core.go +++ b/nodebuilder/state/core.go @@ -1,16 +1,13 @@ package state import ( - "errors" - "os" - "github.com/cosmos/cosmos-sdk/crypto/keyring" + "google.golang.org/grpc" libfraud "github.com/celestiaorg/go-fraud" "github.com/celestiaorg/go-header/sync" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/nodebuilder/core" modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" "github.com/celestiaorg/celestia-node/share/eds/byzantine" @@ -20,30 +17,19 @@ import ( // coreAccessor constructs a new instance of state.Module over // a celestia-core connection. func coreAccessor( - corecfg core.Config, keyring keyring.Keyring, keyname AccountName, sync *sync.Syncer[*header.ExtendedHeader], fraudServ libfraud.Service[*header.ExtendedHeader], network p2p.Network, - opts []state.Option, + client *grpc.ClientConn, ) ( *state.CoreAccessor, Module, *modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader], error, ) { - if corecfg.TLSEnabled { - tlsCfg := core.EmptyTLSConfig() - xtoken, err := core.XToken(corecfg.XTokenPath) - if err != nil && !errors.Is(err, os.ErrNotExist) { - return nil, nil, nil, err - } - opts = append(opts, state.WithTLSConfig(tlsCfg), state.WithXToken(xtoken)) - } - - ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, - corecfg.IP, corecfg.Port, network.String(), opts...) + ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, client, network.String()) sBreaker := &modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]{ Service: ca, diff --git a/nodebuilder/state/module.go b/nodebuilder/state/module.go index bd6f2081d3..0e80ab3209 100644 --- a/nodebuilder/state/module.go +++ b/nodebuilder/state/module.go @@ -23,11 +23,9 @@ var log = logging.Logger("module/state") func ConstructModule(tp node.Type, cfg *Config, coreCfg *core.Config) fx.Option { // sanitize config values before constructing module cfgErr := cfg.Validate() - opts := make([]state.Option, 0) baseComponents := fx.Options( fx.Supply(*cfg), fx.Error(cfgErr), - fx.Supply(opts), fx.Provide(func(ks keystore.Keystore) (keyring.Keyring, AccountName, error) { return Keyring(*cfg, ks) }), diff --git a/nodebuilder/testing.go b/nodebuilder/testing.go index d205fca120..44c026f007 100644 --- a/nodebuilder/testing.go +++ b/nodebuilder/testing.go @@ -8,6 +8,8 @@ import ( "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/stretchr/testify/require" "go.uber.org/fx" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" libhead "github.com/celestiaorg/go-header" @@ -73,12 +75,15 @@ func TestNodeWithConfig(t *testing.T, tp node.Type, cfg *Config, opts ...fx.Opti // in fact, we don't need core.Client in tests, but the Bridge node requires a valid one. // otherwise, it fails with a failed attempt to connect with a custom build client. if tp == node.Bridge { - host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + _, _, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + con, err := grpc.NewClient( + core.StartTestNode(t).GRPCClient.Target(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) require.NoError(t, err) - client := core.NewClient(host, port) - require.NoError(t, client.Start()) opts = append(opts, - fxutil.ReplaceAs(client, new(core.Client)), + fxutil.ReplaceAs(con, new(grpc.ClientConn)), ) } diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index 1f4a56b319..3fe779e76d 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -18,6 +18,8 @@ import ( "github.com/tendermint/tendermint/types" "go.uber.org/fx" "golang.org/x/exp/maps" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-app/v3/test/util/testnode" libhead "github.com/celestiaorg/go-header" @@ -181,9 +183,13 @@ func (s *Swamp) setupGenesis() { host, port, err := net.SplitHostPort(s.ClientContext.GRPCClient.Target()) require.NoError(s.t, err) - client := core.NewClient(host, port) - require.NoError(s.t, client.Start()) - fetcher, err := core.NewBlockFetcher(client) + addr := net.JoinHostPort(host, port) + con, err := grpc.NewClient( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(s.t, err) + fetcher, err := core.NewBlockFetcher(con) require.NoError(s.t, err) ex, err := core.NewExchange( @@ -292,12 +298,14 @@ func (s *Swamp) NewNodeWithStore( if err != nil { return nil, err } - client := core.NewClient(host, port) - if err := client.Start(); err != nil { - return nil, err - } + addr := net.JoinHostPort(host, port) + con, err := grpc.NewClient( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(s.t, err) options = append(options, - coremodule.WithClient(client), + coremodule.WithConnection(con), ) default: } diff --git a/state/core_access.go b/state/core_access.go index 80e6a8eccc..07747f5333 100644 --- a/state/core_access.go +++ b/state/core_access.go @@ -2,10 +2,8 @@ package state import ( "context" - "crypto/tls" "errors" "fmt" - "net" "sync" "time" @@ -21,10 +19,6 @@ import ( "github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/proto/tendermint/crypto" "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/metadata" "github.com/celestiaorg/celestia-app/v3/app" "github.com/celestiaorg/celestia-app/v3/app/encoding" @@ -46,22 +40,6 @@ var ( log = logging.Logger("state") ) -// Option is the functional option that is applied to the coreAccessor instance -// to configure parameters. -type Option func(ca *CoreAccessor) - -func WithTLSConfig(cfg *tls.Config) Option { - return func(ca *CoreAccessor) { - ca.tls = cfg - } -} - -func WithXToken(xtoken string) Option { - return func(ca *CoreAccessor) { - ca.xtoken = xtoken - } -} - // CoreAccessor implements service over a gRPC connection // with a celestia-core node. type CoreAccessor struct { @@ -84,13 +62,8 @@ type CoreAccessor struct { prt *merkle.ProofRuntime coreConn *grpc.ClientConn - coreIP string - port string network string - tls *tls.Config - xtoken string - // these fields are mutatable and thus need to be protected by a mutex lock sync.Mutex lastPayForBlob int64 @@ -109,8 +82,8 @@ func NewCoreAccessor( keyring keyring.Keyring, keyname string, getter libhead.Head[*header.ExtendedHeader], - coreIP, port, network string, - options ...Option, + conn *grpc.ClientConn, + network string, ) (*CoreAccessor, error) { // create verifier prt := merkle.DefaultProofRuntime() @@ -121,33 +94,18 @@ func NewCoreAccessor( keyring: keyring, defaultSignerAccount: keyname, getter: getter, - coreIP: coreIP, - port: port, prt: prt, + coreConn: conn, network: network, } - - for _, opt := range options { - opt(ca) - } return ca, nil } func (ca *CoreAccessor) Start(ctx context.Context) error { - if ca.coreConn != nil { - return fmt.Errorf("core-access: already connected to core endpoint") - } ca.ctx, ca.cancel = context.WithCancel(context.Background()) - - err := ca.startGRPCClient(ctx) - if err != nil { - return fmt.Errorf("failed to start grpc client: %w", err) - } - // create the staking query client ca.stakingCli = stakingtypes.NewQueryClient(ca.coreConn) ca.feeGrantCli = feegrant.NewQueryClient(ca.coreConn) - // create ABCI query client ca.abciQueryCli = tmservice.NewServiceClient(ca.coreConn) resp, err := ca.abciQueryCli.GetNodeInfo(ctx, &tmservice.GetNodeInfoRequest{}) @@ -175,29 +133,8 @@ func (ca *CoreAccessor) Start(ctx context.Context) error { } func (ca *CoreAccessor) Stop(context.Context) error { - if ca.cancel == nil { - log.Warn("core accessor already stopped") - return nil - } - if ca.coreConn == nil { - log.Warn("no connection found to close") - return nil - } - defer ca.cancelCtx() - - // close out core connection - err := ca.coreConn.Close() - if err != nil { - return err - } - - ca.coreConn = nil - return nil -} - -func (ca *CoreAccessor) cancelCtx() { ca.cancel() - ca.cancel = nil + return nil } // SubmitPayForBlob builds, signs, and synchronously submits a MsgPayForBlob with additional @@ -605,40 +542,6 @@ func (ca *CoreAccessor) setupTxClient(ctx context.Context, keyName string) (*use ) } -func (ca *CoreAccessor) startGRPCClient(ctx context.Context) error { - // dial given celestia-core endpoint - endpoint := net.JoinHostPort(ca.coreIP, ca.port) - // By default, the gRPC client is configured to handle an insecure connection. - // If the TLS configuration is not empty, it will be applied to the client's options. - // If the TLS configuration is empty but the X-Token is provided, - // the X-Token will be applied as an interceptor along with an empty TLS configuration. - opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} - if ca.tls != nil { - opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(ca.tls))) - } - if ca.xtoken != "" { - opts = append(opts, grpc.WithUnaryInterceptor(authInterceptor(ca.xtoken))) - } - - client, err := grpc.NewClient( - endpoint, - opts..., - ) - if err != nil { - return err - } - // this ensures we can't start the node without core connection - client.Connect() - if !client.WaitForStateChange(ctx, connectivity.Ready) { - // hits the case when context is canceled - return fmt.Errorf("couldn't connect to core endpoint(%s): %w", endpoint, ctx.Err()) - } - ca.coreConn = client - - log.Infof("Connection with core endpoint(%s) established", endpoint) - return nil -} - func (ca *CoreAccessor) submitMsg( ctx context.Context, msg sdktypes.Msg, @@ -695,17 +598,3 @@ func convertToSdkTxResponse(resp *user.TxResponse) *TxResponse { Height: resp.Height, } } - -func authInterceptor(xtoken string) grpc.UnaryClientInterceptor { - return func( - ctx context.Context, - method string, - req, reply interface{}, - cc *grpc.ClientConn, - invoker grpc.UnaryInvoker, - opts ...grpc.CallOption, - ) error { - ctx = metadata.AppendToOutgoingContext(ctx, "x-token", xtoken) - return invoker(ctx, method, req, reply, cc, opts...) - } -} diff --git a/state/core_access_test.go b/state/core_access_test.go index c487944749..cde4e8182f 100644 --- a/state/core_access_test.go +++ b/state/core_access_test.go @@ -6,13 +6,14 @@ import ( "context" "errors" "fmt" - "strings" "testing" "time" sdktypes "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-app/v3/app" "github.com/celestiaorg/celestia-app/v3/pkg/appconsts" @@ -216,11 +217,6 @@ func TestDelegate(t *testing.T) { } } -func extractPort(addr string) string { - splitStr := strings.Split(addr, ":") - return splitStr[len(splitStr)-1] -} - func buildAccessor(t *testing.T) (*CoreAccessor, []string) { chainID := "private" @@ -264,7 +260,9 @@ func buildAccessor(t *testing.T) (*CoreAccessor, []string) { WithAppCreator(appCreator) // needed until https://github.com/celestiaorg/celestia-app/pull/3680 merges cctx, _, grpcAddr := testnode.NewNetwork(t, config) - ca, err := NewCoreAccessor(cctx.Keyring, accounts[0].Name, nil, "127.0.0.1", extractPort(grpcAddr), chainID) + conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + ca, err := NewCoreAccessor(cctx.Keyring, accounts[0].Name, nil, conn, chainID) require.NoError(t, err) return ca, getNames(accounts) } diff --git a/state/integration_test.go b/state/integration_test.go index 8680b4d181..f606841417 100644 --- a/state/integration_test.go +++ b/state/integration_test.go @@ -52,8 +52,11 @@ func (s *IntegrationTestSuite) SetupSuite() { s.Require().Greater(len(s.accounts), 0) accountName := s.accounts[0].Name - accessor, err := NewCoreAccessor(s.cctx.Keyring, accountName, localHeader{s.cctx.Client}, "", "", "") + accessor, err := NewCoreAccessor(s.cctx.Keyring, accountName, localHeader{s.cctx.Client}, nil, "") require.NoError(s.T(), err) + ctx, cancel := context.WithCancel(context.Background()) + accessor.ctx = ctx + accessor.cancel = cancel setClients(accessor, s.cctx.GRPCClient) s.accessor = accessor @@ -65,8 +68,7 @@ func (s *IntegrationTestSuite) SetupSuite() { func setClients(ca *CoreAccessor, conn *grpc.ClientConn) { ca.coreConn = conn // create the staking query client - stakingCli := stakingtypes.NewQueryClient(ca.coreConn) - ca.stakingCli = stakingCli + ca.stakingCli = stakingtypes.NewQueryClient(ca.coreConn) ca.abciQueryCli = tmservice.NewServiceClient(ca.coreConn) }