Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): update daser for shwap #3614

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion das/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)

// samplingCoordinator runs and coordinates sampling workers and updates current sampling state
Expand Down
2 changes: 1 addition & 1 deletion das/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)

func TestCoordinator(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)

var log = logging.Logger("das")
Expand Down
213 changes: 104 additions & 109 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,27 @@ package das

import (
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/go-fraud"
"github.com/celestiaorg/go-fraud/fraudserv"
"github.com/celestiaorg/go-fraud/fraudtest"
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
headerfraud "github.com/celestiaorg/celestia-node/header/headertest/fraud"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/availability/mocks"
availability_test "github.com/celestiaorg/celestia-node/share/availability/test"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)

var timeout = time.Second * 15
Expand All @@ -41,10 +31,11 @@ var timeout = time.Second * 15
// the DASer checkpoint is updated to network head.
func TestDASerLifecycle(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
bServ := ipld.NewMemBlockservice()
avail := light.TestAvailability(getters.NewIPLDGetter(bServ))
ctrl := gomock.NewController(t)
avail := mocks.NewMockAvailability(ctrl)
avail.EXPECT().SharesAvailable(gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
// 15 headers from the past and 15 future headers
mockGet, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15)
mockGet, sub, mockService := createDASerSubcomponents(t, 15, 15)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)
Expand Down Expand Up @@ -81,10 +72,11 @@ func TestDASerLifecycle(t *testing.T) {

func TestDASer_Restart(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
bServ := ipld.NewMemBlockservice()
avail := light.TestAvailability(getters.NewIPLDGetter(bServ))
ctrl := gomock.NewController(t)
avail := mocks.NewMockAvailability(ctrl)
avail.EXPECT().SharesAvailable(gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
// 15 headers from the past and 15 future headers
mockGet, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15)
mockGet, sub, mockService := createDASerSubcomponents(t, 15, 15)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)
Expand All @@ -109,10 +101,10 @@ func TestDASer_Restart(t *testing.T) {
require.NoError(t, err)

// reset mockGet, generate 15 "past" headers, building off chain head which is 30
mockGet.generateHeaders(t, bServ, 30, 45)
mockGet.generateHeaders(t, 30, 45)
mockGet.doneCh = make(chan struct{})
// reset dummy subscriber
mockGet.fillSubWithHeaders(t, sub, bServ, 45, 60)
mockGet.fillSubWithHeaders(t, sub, 45, 60)
// manually set mockGet head to trigger finished at 45
mockGet.head = int64(45)

Expand Down Expand Up @@ -144,73 +136,76 @@ func TestDASer_Restart(t *testing.T) {
assert.EqualValues(t, 60, checkpoint.SampleFrom-1)
}

func TestDASer_stopsAfter_BEFP(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
bServ := ipld.NewMemBlockservice()
// create mock network
net, err := mocknet.FullMeshLinked(1)
require.NoError(t, err)
// create pubsub for host
ps, err := pubsub.NewGossipSub(ctx, net.Hosts()[0],
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
require.NoError(t, err)
avail := full.TestAvailability(t, getters.NewIPLDGetter(bServ))
// 15 headers from the past and 15 future headers
mockGet, sub, _ := createDASerSubcomponents(t, bServ, 15, 15)

// create fraud service and break one header
getter := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
return mockGet.GetByHeight(ctx, height)
}
headGetter := func(ctx context.Context) (*header.ExtendedHeader, error) {
return mockGet.Head(ctx)
}
unmarshaler := fraud.MultiUnmarshaler[*header.ExtendedHeader]{
Unmarshalers: map[fraud.ProofType]func([]byte) (fraud.Proof[*header.ExtendedHeader], error){
byzantine.BadEncoding: func(data []byte) (fraud.Proof[*header.ExtendedHeader], error) {
befp := &byzantine.BadEncodingProof{}
return befp, befp.UnmarshalBinary(data)
},
},
}

fserv := fraudserv.NewProofService[*header.ExtendedHeader](ps,
net.Hosts()[0],
getter,
headGetter,
unmarshaler,
ds,
false,
"private",
)
require.NoError(t, fserv.Start(ctx))
mockGet.headers[1] = headerfraud.CreateFraudExtHeader(t, mockGet.headers[1], bServ)
newCtx := context.Background()

// create and start DASer
daser, err := NewDASer(avail, sub, mockGet, ds, fserv, newBroadcastMock(1))
require.NoError(t, err)

resultCh := make(chan error)
go fraud.OnProof[*header.ExtendedHeader](newCtx, fserv, byzantine.BadEncoding,
func(fraud.Proof[*header.ExtendedHeader]) {
resultCh <- daser.Stop(newCtx)
})

require.NoError(t, daser.Start(newCtx))
// wait for fraud proof will be handled
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case res := <-resultCh:
require.NoError(t, res)
}
// wait for manager to finish catchup
require.True(t, daser.running == 0)
}
// TODO(@walldiss): BEFP test will not work until BEFP-shwap integration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just t.Skip() ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presumed it doesn't compile

//func TestDASer_stopsAfter_BEFP(t *testing.T) {
// ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
// t.Cleanup(cancel)
//
// ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
// // create mock network
// net, err := mocknet.FullMeshLinked(1)
// require.NoError(t, err)
// // create pubsub for host
// ps, err := pubsub.NewGossipSub(ctx, net.Hosts()[0],
// pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
// require.NoError(t, err)
//
// ctrl := gomock.NewController(t)
// avail := mocks.NewMockAvailability(ctrl)
// avail.EXPECT().SharesAvailable(gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
// // 15 headers from the past and 15 future headers
// mockGet, sub, _ := createDASerSubcomponents(t, 15, 15)
//
// // create fraud service and break one header
// getter := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
// return mockGet.GetByHeight(ctx, height)
// }
// headGetter := func(ctx context.Context) (*header.ExtendedHeader, error) {
// return mockGet.Head(ctx)
// }
// unmarshaler := fraud.MultiUnmarshaler[*header.ExtendedHeader]{
// Unmarshalers: map[fraud.ProofType]func([]byte) (fraud.Proof[*header.ExtendedHeader], error){
// byzantine.BadEncoding: func(data []byte) (fraud.Proof[*header.ExtendedHeader], error) {
// befp := &byzantine.BadEncodingProof{}
// return befp, befp.UnmarshalBinary(data)
// },
// },
// }
//
// fserv := fraudserv.NewProofService[*header.ExtendedHeader](ps,
// net.Hosts()[0],
// getter,
// headGetter,
// unmarshaler,
// ds,
// false,
// "private",
// )
// require.NoError(t, fserv.Start(ctx))
// mockGet.headers[1] = headerfraud.CreateFraudExtHeader(t, mockGet.headers[1])
// newCtx := context.Background()
//
// // create and start DASer
// daser, err := NewDASer(avail, sub, mockGet, ds, fserv, newBroadcastMock(1))
// require.NoError(t, err)
//
// resultCh := make(chan error)
// go fraud.OnProof[*header.ExtendedHeader](newCtx, fserv, byzantine.BadEncoding,
// func(fraud.Proof[*header.ExtendedHeader]) {
// resultCh <- daser.Stop(newCtx)
// })
//
// require.NoError(t, daser.Start(newCtx))
// // wait for fraud proof will be handled
// select {
// case <-ctx.Done():
// t.Fatal(ctx.Err())
// case res := <-resultCh:
// require.NoError(t, res)
// }
// // wait for manager to finish catchup
// require.True(t, daser.running == 0)
//}

func TestDASerSampleTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand All @@ -219,17 +214,18 @@ func TestDASerSampleTimeout(t *testing.T) {
getter := getterStub{}
avail := mocks.NewMockAvailability(gomock.NewController(t))
doneCh := make(chan struct{})
avail.EXPECT().SharesAvailable(gomock.Any(), gomock.Any()).DoAndReturn(
func(sampleCtx context.Context, h *header.ExtendedHeader) error {
select {
case <-sampleCtx.Done():
close(doneCh)
return nil
case <-ctx.Done():
t.Fatal("call context didn't timeout in time")
return ctx.Err()
}
})
avail.EXPECT().SharesAvailable(gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(
func(sampleCtx context.Context, h *header.ExtendedHeader) error {
select {
case <-sampleCtx.Done():
close(doneCh)
return nil
case <-ctx.Done():
t.Fatal("call context didn't timeout in time")
return ctx.Err()
}
})

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
sub := new(headertest.Subscriber)
Expand Down Expand Up @@ -295,18 +291,16 @@ func TestDASer_SamplingWindow(t *testing.T) {
// mockGetter, share.Availability, and mock header.Subscriber.
func createDASerSubcomponents(
t *testing.T,
bServ blockservice.BlockService,
numGetter,
numSub int,
) (*mockGetter, *headertest.Subscriber, *fraudtest.DummyService[*header.ExtendedHeader]) {
mockGet, sub := createMockGetterAndSub(t, bServ, numGetter, numSub)
mockGet, sub := createMockGetterAndSub(t, numGetter, numSub)
fraud := &fraudtest.DummyService[*header.ExtendedHeader]{}
return mockGet, sub, fraud
}

func createMockGetterAndSub(
t *testing.T,
bServ blockservice.BlockService,
numGetter,
numSub int,
) (*mockGetter, *headertest.Subscriber) {
Expand All @@ -316,27 +310,25 @@ func createMockGetterAndSub(
brokenHeightCh: make(chan struct{}),
}

mockGet.generateHeaders(t, bServ, 0, numGetter)
mockGet.generateHeaders(t, 0, numGetter)

sub := new(headertest.Subscriber)
mockGet.fillSubWithHeaders(t, sub, bServ, numGetter, numGetter+numSub)
mockGet.fillSubWithHeaders(t, sub, numGetter, numGetter+numSub)
return mockGet, sub
}

// fillSubWithHeaders generates `num` headers from the future for p2pSub to pipe through to DASer.
func (m *mockGetter) fillSubWithHeaders(
t *testing.T,
sub *headertest.Subscriber,
bServ blockservice.BlockService,
startHeight,
endHeight int,
) {
sub.Headers = make([]*header.ExtendedHeader, endHeight-startHeight)

index := 0
for i := startHeight; i < endHeight; i++ {
roots := availability_test.RandFillBS(t, 16, bServ)

roots := edstest.RandomAxisRoots(t, 16)
randHeader := headertest.RandExtendedHeaderWithRoot(t, roots)
randHeader.RawHeader.Height = int64(i + 1)

Expand All @@ -359,9 +351,9 @@ type mockGetter struct {
headers map[int64]*header.ExtendedHeader
}

func (m *mockGetter) generateHeaders(t *testing.T, bServ blockservice.BlockService, startHeight, endHeight int) {
func (m *mockGetter) generateHeaders(t *testing.T, startHeight, endHeight int) {
for i := startHeight; i < endHeight; i++ {
roots := availability_test.RandFillBS(t, 16, bServ)
roots := edstest.RandomAxisRoots(t, 16)

randHeader := headertest.RandExtendedHeaderWithRoot(t, roots)
randHeader.RawHeader.Height = int64(i + 1)
Expand Down Expand Up @@ -397,7 +389,10 @@ func (m *mockGetter) GetByHeight(_ context.Context, height uint64) (*header.Exte
}
}()

return m.headers[int64(height)], nil
if h, ok := m.headers[int64(height)]; ok {
return h, nil
}
return nil, fmt.Errorf("header not found")
}

type benchGetterStub struct {
Expand Down
2 changes: 1 addition & 1 deletion das/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)

type jobType string
Expand Down
4 changes: 0 additions & 4 deletions header/headertest/fraud/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/ipfs/boxo/blockservice"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/bytes"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
Expand Down Expand Up @@ -89,11 +88,8 @@ func (f *FraudMaker) MakeExtendedHeader(odsSize int, edsStore *eds.Store) header
func CreateFraudExtHeader(
t *testing.T,
eh *header.ExtendedHeader,
serv blockservice.BlockService,
) *header.ExtendedHeader {
square := edstest.RandByzantineEDS(t, len(eh.DAH.RowRoots))
err := ipld.ImportEDS(context.Background(), square, serv)
require.NoError(t, err)
dah, err := da.NewDataAvailabilityHeader(square)
require.NoError(t, err)
eh.DAH = &dah
Expand Down
Loading