Skip to content

Commit

Permalink
refactor(swamp): use RPC client instead of service pointers (celestia…
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd authored and vgonkivs committed Oct 5, 2023
1 parent b061c5c commit 84b5988
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 87 deletions.
2 changes: 2 additions & 0 deletions nodebuilder/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"

"github.com/cristalhq/jwt"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/exchange"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -48,6 +49,7 @@ type Node struct {
Network p2p.Network
Bootstrappers p2p.Bootstrappers
Config *Config
AdminSigner jwt.Signer

// rpc components
RPCServer *rpc.Server // not optional
Expand Down
48 changes: 27 additions & 21 deletions nodebuilder/tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,27 @@ import (
"github.com/celestiaorg/celestia-node/api/rpc/client"
"github.com/celestiaorg/celestia-node/blob"
"github.com/celestiaorg/celestia-node/blob/blobtest"
"github.com/celestiaorg/celestia-node/libs/authtoken"
"github.com/celestiaorg/celestia-node/nodebuilder"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp"
)

func getAdminClient(ctx context.Context, nd *nodebuilder.Node, t *testing.T) *client.Client {
t.Helper()

signer := nd.AdminSigner
listenAddr := "ws://" + nd.RPCServer.ListenAddr()

jwt, err := authtoken.NewSignedJWT(signer, []auth.Permission{"public", "read", "write", "admin"})
require.NoError(t, err)

client, err := client.NewClient(ctx, listenAddr, jwt)
require.NoError(t, err)

return client
}

func TestNodeModule(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout)
t.Cleanup(cancel)
Expand Down Expand Up @@ -66,26 +82,20 @@ func TestGetByHeight(t *testing.T) {
err := bridge.Start(ctx)
require.NoError(t, err)

adminPerms := []auth.Permission{"public", "read", "write", "admin"}
jwt, err := bridge.AdminServ.AuthNew(ctx, adminPerms)
require.NoError(t, err)

bridgeAddr := "http://" + bridge.RPCServer.ListenAddr()
client, err := client.NewClient(ctx, bridgeAddr, jwt)
require.NoError(t, err)
rpcClient := getAdminClient(ctx, bridge, t)

// let a few blocks be produced
_, err = client.Header.WaitForHeight(ctx, 3)
_, err = rpcClient.Header.WaitForHeight(ctx, 3)
require.NoError(t, err)

networkHead, err := client.Header.NetworkHead(ctx)
networkHead, err := rpcClient.Header.NetworkHead(ctx)
require.NoError(t, err)
_, err = client.Header.GetByHeight(ctx, networkHead.Height()+1)
_, err = rpcClient.Header.GetByHeight(ctx, networkHead.Height()+1)
require.Nil(t, err, "Requesting syncer.Head()+1 shouldn't return an error")

networkHead, err = client.Header.NetworkHead(ctx)
networkHead, err = rpcClient.Header.NetworkHead(ctx)
require.NoError(t, err)
_, err = client.Header.GetByHeight(ctx, networkHead.Height()+2)
_, err = rpcClient.Header.GetByHeight(ctx, networkHead.Height()+2)
require.ErrorContains(t, err, "given height is from the future")
}

Expand All @@ -101,13 +111,7 @@ func TestBlobRPC(t *testing.T) {
err := bridge.Start(ctx)
require.NoError(t, err)

adminPerms := []auth.Permission{"public", "read", "write", "admin"}
jwt, err := bridge.AdminServ.AuthNew(ctx, adminPerms)
require.NoError(t, err)

bridgeAddr := "http://" + bridge.RPCServer.ListenAddr()
client, err := client.NewClient(ctx, bridgeAddr, jwt)
require.NoError(t, err)
rpcClient := getAdminClient(ctx, bridge, t)

appBlobs, err := blobtest.GenerateV0Blobs([]int{8}, false)
require.NoError(t, err)
Expand All @@ -119,7 +123,7 @@ func TestBlobRPC(t *testing.T) {
)
require.NoError(t, err)

height, err := client.Blob.Submit(ctx, []*blob.Blob{newBlob}, nil)
height, err := rpcClient.Blob.Submit(ctx, []*blob.Blob{newBlob}, nil)
require.NoError(t, err)
require.True(t, height != 0)
}
Expand Down Expand Up @@ -147,9 +151,11 @@ func TestHeaderSubscription(t *testing.T) {
err = light.Start(ctx)
require.NoError(t, err)

lightClient := getAdminClient(ctx, light, t)

// subscribe to headers via the light node's RPC header subscription
subctx, subcancel := context.WithCancel(ctx)
sub, err := light.HeaderServ.Subscribe(subctx)
sub, err := lightClient.Header.Subscribe(subctx)
require.NoError(t, err)
// listen for 5 headers
for i := 0; i < 5; i++ {
Expand Down
19 changes: 11 additions & 8 deletions nodebuilder/tests/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ func TestBlobModule(t *testing.T) {
lightNode := sw.NewNodeWithConfig(node.Light, lightCfg)
require.NoError(t, lightNode.Start(ctx))

height, err := fullNode.BlobServ.Submit(ctx, blobs, nil)
fullClient := getAdminClient(ctx, fullNode, t)
lightClient := getAdminClient(ctx, lightNode, t)

height, err := fullClient.Blob.Submit(ctx, blobs, nil)
require.NoError(t, err)

_, err = fullNode.HeaderServ.WaitForHeight(ctx, height)
_, err = fullClient.Header.WaitForHeight(ctx, height)
require.NoError(t, err)
_, err = lightNode.HeaderServ.WaitForHeight(ctx, height)
_, err = lightClient.Header.WaitForHeight(ctx, height)
require.NoError(t, err)

var test = []struct {
Expand All @@ -70,15 +73,15 @@ func TestBlobModule(t *testing.T) {
{
name: "Get",
doFn: func(t *testing.T) {
blob1, err := fullNode.BlobServ.Get(ctx, height, blobs[0].Namespace(), blobs[0].Commitment)
blob1, err := fullClient.Blob.Get(ctx, height, blobs[0].Namespace(), blobs[0].Commitment)
require.NoError(t, err)
require.Equal(t, blobs[0], blob1)
},
},
{
name: "GetAll",
doFn: func(t *testing.T) {
newBlobs, err := fullNode.BlobServ.GetAll(ctx, height, []share.Namespace{blobs[0].Namespace()})
newBlobs, err := fullClient.Blob.GetAll(ctx, height, []share.Namespace{blobs[0].Namespace()})
require.NoError(t, err)
require.Len(t, newBlobs, len(appBlobs0))
require.True(t, bytes.Equal(blobs[0].Commitment, newBlobs[0].Commitment))
Expand All @@ -88,10 +91,10 @@ func TestBlobModule(t *testing.T) {
{
name: "Included",
doFn: func(t *testing.T) {
proof, err := fullNode.BlobServ.GetProof(ctx, height, blobs[0].Namespace(), blobs[0].Commitment)
proof, err := fullClient.Blob.GetProof(ctx, height, blobs[0].Namespace(), blobs[0].Commitment)
require.NoError(t, err)

included, err := lightNode.BlobServ.Included(
included, err := lightClient.Blob.Included(
ctx,
height,
blobs[0].Namespace(),
Expand All @@ -114,7 +117,7 @@ func TestBlobModule(t *testing.T) {
)
require.NoError(t, err)

b, err := fullNode.BlobServ.Get(ctx, height, newBlob.Namespace(), newBlob.Commitment)
b, err := fullClient.Blob.Get(ctx, height, newBlob.Namespace(), newBlob.Commitment)
assert.Nil(t, b)
require.Error(t, err)
require.ErrorIs(t, err, blob.ErrBlobNotFound)
Expand Down
12 changes: 8 additions & 4 deletions nodebuilder/tests/fraud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ func TestFraudProofHandling(t *testing.T) {
err = full.Start(ctx)
require.NoError(t, err)

fullClient := getAdminClient(ctx, full, t)

// 5.
subCtx, subCancel := context.WithCancel(ctx)
subscr, err := full.FraudServ.Subscribe(subCtx, byzantine.BadEncoding)
subscr, err := fullClient.Fraud.Subscribe(subCtx, byzantine.BadEncoding)
require.NoError(t, err)
select {
case p := <-subscr:
Expand All @@ -108,7 +110,7 @@ func TestFraudProofHandling(t *testing.T) {
// lifecycles of each Module.
// 6.
syncCtx, syncCancel := context.WithTimeout(context.Background(), blockTime*5)
_, err = full.HeaderServ.WaitForHeight(syncCtx, 15)
_, err = fullClient.Header.WaitForHeight(syncCtx, 15)
require.ErrorIs(t, err, context.DeadlineExceeded)
syncCancel()

Expand All @@ -118,10 +120,11 @@ func TestFraudProofHandling(t *testing.T) {
lnStore := nodebuilder.MockStore(t, cfg)
light := sw.NewNodeWithStore(node.Light, lnStore)
require.NoError(t, light.Start(ctx))
lightClient := getAdminClient(ctx, light, t)

// 8.
subCtx, subCancel = context.WithCancel(ctx)
subscr, err = light.FraudServ.Subscribe(subCtx, byzantine.BadEncoding)
subscr, err = lightClient.Fraud.Subscribe(subCtx, byzantine.BadEncoding)
require.NoError(t, err)
select {
case p := <-subscr:
Expand All @@ -135,7 +138,8 @@ func TestFraudProofHandling(t *testing.T) {
// 9.
fN := sw.NewNodeWithStore(node.Full, store)
require.Error(t, fN.Start(ctx))
proofs, err := fN.FraudServ.Get(ctx, byzantine.BadEncoding)
fNClient := getAdminClient(ctx, fN, t)
proofs, err := fNClient.Fraud.Get(ctx, byzantine.BadEncoding)
require.NoError(t, err)
require.NotNil(t, proofs)

Expand Down
22 changes: 15 additions & 7 deletions nodebuilder/tests/nd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func TestShrexNDFromLights(t *testing.T) {
err = light.Start(ctx)
require.NoError(t, err)

bridgeClient := getAdminClient(ctx, bridge, t)
lightClient := getAdminClient(ctx, light, t)

// wait for chain to be filled
require.NoError(t, <-fillDn)

Expand All @@ -54,17 +57,17 @@ func TestShrexNDFromLights(t *testing.T) {
// the block that actually has transactions. We can get this data from the
// response returned by FillBlock.
for i := 16; i < blocks; i++ {
h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i))
h, err := bridgeClient.Header.GetByHeight(ctx, uint64(i))
require.NoError(t, err)

reqCtx, cancel := context.WithTimeout(ctx, time.Second*5)

// ensure to fetch random namespace (not the reserved namespace)
namespace := h.DAH.RowRoots[1][:share.NamespaceSize]

expected, err := bridge.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
expected, err := bridgeClient.Share.GetSharesByNamespace(reqCtx, h.DAH, namespace)
require.NoError(t, err)
got, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
got, err := lightClient.Share.GetSharesByNamespace(reqCtx, h.DAH, namespace)
require.NoError(t, err)

require.True(t, len(got[0].Shares) > 0)
Expand Down Expand Up @@ -113,12 +116,15 @@ func TestShrexNDFromLightsWithBadFulls(t *testing.T) {
require.NoError(t, startFullNodes(ctx, fulls...))
require.NoError(t, light.Start(ctx))

bridgeClient := getAdminClient(ctx, bridge, t)
lightClient := getAdminClient(ctx, light, t)

// wait for chain to fill up
require.NoError(t, <-fillDn)

// first 2 blocks are not filled with data
for i := 3; i < blocks; i++ {
h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i))
h, err := bridgeClient.Header.GetByHeight(ctx, uint64(i))
require.NoError(t, err)

if len(h.DAH.RowRoots) != bsize*2 {
Expand All @@ -133,16 +139,18 @@ func TestShrexNDFromLightsWithBadFulls(t *testing.T) {
// ensure to fetch random namespace (not the reserved namespace)
namespace := h.DAH.RowRoots[1][:share.NamespaceSize]

expected, err := bridge.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
expected, err := bridgeClient.Share.GetSharesByNamespace(reqCtx, h.DAH, namespace)
require.NoError(t, err)
require.True(t, len(expected[0].Shares) > 0)

// choose a random full to test
gotFull, err := fulls[len(fulls)/2].ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
fN := fulls[len(fulls)/2]
fnClient := getAdminClient(ctx, fN, t)
gotFull, err := fnClient.Share.GetSharesByNamespace(reqCtx, h.DAH, namespace)
require.NoError(t, err)
require.True(t, len(gotFull[0].Shares) > 0)

gotLight, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
gotLight, err := lightClient.Share.GetSharesByNamespace(reqCtx, h.DAH, namespace)
require.NoError(t, err)
require.True(t, len(gotLight[0].Shares) > 0)

Expand Down
39 changes: 31 additions & 8 deletions nodebuilder/tests/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func TestBridgeNodeAsBootstrapper(t *testing.T) {
require.NoError(t, nd.Start(ctx))
assert.Equal(t, *addr, nd.Bootstrappers[0])
// ensure that node is actually connected to BN
assert.True(t, nd.Host.Network().Connectedness(addr.ID) == network.Connected)
client := getAdminClient(ctx, nd, t)
connectedenss, err := client.P2P.Connectedness(ctx, addr.ID)
require.NoError(t, err)
assert.Equal(t, connectedenss, network.Connected)
}
}

Expand Down Expand Up @@ -102,15 +105,22 @@ func TestFullDiscoveryViaBootstrapper(t *testing.T) {
for index := range nodes {
require.NoError(t, nodes[index].Start(ctx))
assert.Equal(t, *bootstrapper, nodes[index].Bootstrappers[0])
assert.True(t, nodes[index].Host.Network().Connectedness(bootstrapper.ID) == network.Connected)
// ensure that node is actually connected to BN
client := getAdminClient(ctx, nodes[index], t)
connectedness, err := client.P2P.Connectedness(ctx, bootstrapper.ID)
require.NoError(t, err)
assert.Equal(t, connectedness, network.Connected)
}

for {
if ctx.Err() != nil {
t.Fatal(ctx.Err())
}
if light.Host.Network().Connectedness(host.InfoFromHost(full.Host).ID) == network.Connected {
// LN discovered FN successfully and is now connected
// LN discovered FN successfully and is now connected
client := getAdminClient(ctx, light, t)
connectedness, err := client.P2P.Connectedness(ctx, host.InfoFromHost(full.Host).ID)
require.NoError(t, err)
if connectedness == network.Connected {
break
}
}
Expand Down Expand Up @@ -158,11 +168,19 @@ func TestRestartNodeDiscovery(t *testing.T) {
for index := 0; index < numFulls; index++ {
nodes[index] = sw.NewNodeWithConfig(node.Full, fullCfg, nodesConfig)
require.NoError(t, nodes[index].Start(ctx))
assert.True(t, nodes[index].Host.Network().Connectedness(bridgeAddr.ID) == network.Connected)
client := getAdminClient(ctx, nodes[index], t)
connectedness, err := client.P2P.Connectedness(ctx, bridgeAddr.ID)
require.NoError(t, err)
assert.Equal(t, connectedness, network.Connected)
}

// ensure FNs are connected to each other
require.True(t, nodes[0].Host.Network().Connectedness(nodes[1].Host.ID()) == network.Connected)
fullClient1 := getAdminClient(ctx, nodes[0], t)
fullClient2 := getAdminClient(ctx, nodes[1], t)

connectedness, err := fullClient1.P2P.Connectedness(ctx, nodes[1].Host.ID())
require.NoError(t, err)
assert.Equal(t, connectedness, network.Connected)

// disconnect the FNs
sw.Disconnect(t, nodes[0], nodes[1])
Expand All @@ -175,8 +193,13 @@ func TestRestartNodeDiscovery(t *testing.T) {

// ensure that the FN with disabled discovery is discovered by both of the
// running FNs that have discovery enabled
require.True(t, nodes[0].Host.Network().Connectedness(disabledDiscoveryFN.Host.ID()) == network.Connected)
require.True(t, nodes[1].Host.Network().Connectedness(disabledDiscoveryFN.Host.ID()) == network.Connected)
connectedness, err = fullClient1.P2P.Connectedness(ctx, disabledDiscoveryFN.Host.ID())
require.NoError(t, err)
assert.Equal(t, connectedness, network.Connected)

connectedness, err = fullClient2.P2P.Connectedness(ctx, disabledDiscoveryFN.Host.ID())
require.NoError(t, err)
assert.Equal(t, connectedness, network.Connected)
}

func setTimeInterval(cfg *nodebuilder.Config, interval time.Duration) {
Expand Down
Loading

0 comments on commit 84b5988

Please sign in to comment.