Skip to content

Commit

Permalink
Restart Push integration tests and corresponding fixes (#78)
Browse files Browse the repository at this point in the history
* restart push integration test
  • Loading branch information
aarshkshah1992 authored Sep 17, 2020
1 parent 6f2bad8 commit 4d3b39a
Show file tree
Hide file tree
Showing 20 changed files with 2,926 additions and 43 deletions.
6 changes: 6 additions & 0 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

// channelState is immutable channel data plus mutable state
type channelState struct {
// peerId of the manager peer
managerPeer peer.ID
// an identifier for this channel shared by request and responder, set by requester through protocol
transferID datatransfer.TransferID
// base CID for the piece being transferred
Expand Down Expand Up @@ -154,4 +156,8 @@ func (c channelState) OtherParty(thisParty peer.ID) peer.ID {
return c.sender
}

func (c channelState) ManagerPeer() peer.ID {
return c.managerPeer
}

var _ datatransfer.ChannelState = channelState{}
17 changes: 9 additions & 8 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {

// CreateNew creates a new channel id and channel state and saves to channels.
// returns error if the channel exists already.
func (c *Channels) CreateNew(tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error) {
func (c *Channels) CreateNew(managerPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error) {
var responder peer.ID
if dataSender == initiator {
responder = dataReceiver
Expand All @@ -107,13 +107,14 @@ func (c *Channels) CreateNew(tid datatransfer.TransferID, baseCid cid.Cid, selec
return datatransfer.ChannelID{}, err
}
err = c.statemachines.Begin(chid, &internalChannelState{
TransferID: tid,
Initiator: initiator,
Responder: responder,
BaseCid: baseCid,
Selector: &cbg.Deferred{Raw: selBytes},
Sender: dataSender,
Recipient: dataReceiver,
ManagerPeer: managerPeer,
TransferID: tid,
Initiator: initiator,
Responder: responder,
BaseCid: baseCid,
Selector: &cbg.Deferred{Raw: selBytes},
Sender: dataSender,
Recipient: dataReceiver,
Vouchers: []encodedVoucher{
{
Type: voucher.Type(),
Expand Down
2 changes: 1 addition & 1 deletion channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var log = logging.Logger("data-transfer")
var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing),
fsm.Event(datatransfer.Restart).FromAny().ToNoChange(),
fsm.Event(datatransfer.Restart).FromAny().To(datatransfer.Ongoing),

fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling),

Expand Down
13 changes: 8 additions & 5 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,25 @@ func TestChannels(t *testing.T) {
peers := testutil.GeneratePeers(4)

t.Run("adding channels", func(t *testing.T) {
chid, err := channelList.CreateNew(tid1, cids[0], selector, fv1, peers[0], peers[0], peers[1])
chid, err := channelList.CreateNew(peers[0], tid1, cids[0], selector, fv1, peers[0], peers[0], peers[1])
require.NoError(t, err)
require.Equal(t, peers[0], chid.Initiator)
require.Equal(t, tid1, chid.ID)

// cannot add twice for same channel id
_, err = channelList.CreateNew(tid1, cids[1], selector, fv2, peers[0], peers[1], peers[0])
_, err = channelList.CreateNew(peers[0], tid1, cids[1], selector, fv2, peers[0], peers[1], peers[0])
require.Error(t, err)
state := checkEvent(ctx, t, received, datatransfer.Open)
require.Equal(t, datatransfer.Requested, state.Status())

// can add for different id
chid, err = channelList.CreateNew(tid2, cids[1], selector, fv2, peers[3], peers[2], peers[3])
chid, err = channelList.CreateNew(peers[0], tid2, cids[1], selector, fv2, peers[3], peers[2], peers[3])
require.NoError(t, err)
require.Equal(t, peers[3], chid.Initiator)
require.Equal(t, tid2, chid.ID)
state = checkEvent(ctx, t, received, datatransfer.Open)
require.Equal(t, datatransfer.Requested, state.Status())
require.Equal(t, peers[0], state.ManagerPeer())
})

t.Run("in progress channels", func(t *testing.T) {
Expand Down Expand Up @@ -97,6 +99,7 @@ func TestChannels(t *testing.T) {
state, err = channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[3], Responder: peers[2], ID: tid2})
require.NotEqual(t, nil, state)
require.NoError(t, err)
require.Equal(t, peers[0], state.ManagerPeer())
})

t.Run("accept", func(t *testing.T) {
Expand All @@ -118,7 +121,7 @@ func TestChannels(t *testing.T) {
channelList, err := channels.New(ds, notifier, decoderByType, decoderByType, &fakeEnv{})
require.NoError(t, err)

_, err = channelList.CreateNew(tid1, cids[0], selector, fv1, peers[0], peers[0], peers[1])
_, err = channelList.CreateNew(peers[0], tid1, cids[0], selector, fv1, peers[0], peers[0], peers[1])
require.NoError(t, err)
state := checkEvent(ctx, t, received, datatransfer.Open)
require.Equal(t, datatransfer.Requested, state.Status())
Expand Down Expand Up @@ -247,7 +250,7 @@ func TestChannels(t *testing.T) {
state = checkEvent(ctx, t, received, datatransfer.CleanupComplete)
require.Equal(t, datatransfer.Failed, state.Status())

chid, err := channelList.CreateNew(tid2, cids[1], selector, fv2, peers[2], peers[1], peers[2])
chid, err := channelList.CreateNew(peers[0], tid2, cids[1], selector, fv2, peers[2], peers[1], peers[2])
require.NoError(t, err)
require.Equal(t, peers[2], chid.Initiator)
require.Equal(t, tid2, chid.ID)
Expand Down
3 changes: 3 additions & 0 deletions channels/internalchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type encodedVoucherResult struct {
}

type internalChannelState struct {
// PeerId of the manager peer
ManagerPeer peer.ID
// an identifier for this channel shared by request and responder, set by requester through protocol
TransferID datatransfer.TransferID
// Initiator is the person who intiated this datatransfer request
Expand Down Expand Up @@ -58,6 +60,7 @@ type internalChannelState struct {

func (c internalChannelState) ToChannelState(voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState {
return channelState{
managerPeer: c.ManagerPeer,
isPull: c.Initiator == c.Recipient,
transferID: c.TransferID,
baseCid: c.BaseCid,
Expand Down
26 changes: 24 additions & 2 deletions channels/internalchannel_cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4d3b39a

Please sign in to comment.