Skip to content

Commit

Permalink
Fix setup.TestCloseLoop.
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Lin committed Jul 29, 2019
1 parent 62daee5 commit c522b90
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 132 deletions.
2 changes: 1 addition & 1 deletion pkg/app/packet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ func ExamplePacket() {

// Output: skywire
// 000000000000000000000000000000000000000000000000000000000000000000:0
// :0 <-> 000000000000000000000000000000000000000000000000000000000000000000:0
// 000000000000000000000000000000000000000000000000000000000000000000:0 <-> 000000000000000000000000000000000000000000000000000000000000000000:0
}
12 changes: 3 additions & 9 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,13 @@ func (sn *Node) serveTransport(tr transport.Transport) error {
case PacketCloseLoop:
var ld routing.LoopData
if err = json.Unmarshal(data, &ld); err == nil {
remote, ok := sn.remote(tr.Edges())
if !ok {
if _, ok := sn.remote(tr.Edges()); !ok {
return errors.New("configured PubKey not found in edges")
}
err = sn.closeLoop(ld.Loop.Remote.PubKey, routing.LoopData{
Loop: routing.Loop{
Remote: routing.Addr{
PubKey: remote,
Port: ld.Loop.Local.Port,
},
Local: routing.Addr{
Port: ld.Loop.Remote.Port,
},
Remote: ld.Loop.Local,
Local: ld.Loop.Remote,
},
})
}
Expand Down
160 changes: 66 additions & 94 deletions pkg/setup/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestNode(t *testing.T) {
for i := 0; i < n; i++ {
pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte{byte(i)})
require.NoError(t, err)
t.Logf("> client[%d] PK: %s\n", i, pk)
t.Logf("client[%d] PK: %s\n", i, pk)
c := dmsg.NewClient(pk, sk, discovery, dmsg.SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s", i, pk))))
require.NoError(t, c.InitiateServerConnections(context.TODO(), 1))
clients[i] = c
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestNode(t *testing.T) {
}

// TODO: This error is not checked due to a bug in dmsg.
_ = proto.WritePacket(RespSuccess, rIDs)
_ = proto.WritePacket(RespSuccess, rIDs) //nolint:errcheck
}

// CLOSURE: emulates how a visor node should react when expecting an ConfirmLoop packet.
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestNode(t *testing.T) {
}

// TODO: This error is not checked due to a bug in dmsg.
_ = proto.WritePacket(RespSuccess, nil)
_ = proto.WritePacket(RespSuccess, nil) //nolint:errcheck
}

expectAddRules(4, routing.RuleApp)
Expand All @@ -200,98 +200,70 @@ func TestNode(t *testing.T) {
// TEST: Emulates the communication between 2 visor nodes and a setup nodes,
// where a route is already established,
// and the first client attempts to tear it down.
//t.Run("CloseLoop", func(t *testing.T) {
// t.SkipNow()
//
// // clients index 0 and 1 are for visor nodes.
// // clients index 2 is for setup node.
// clients, closeClients := prepClients(3)
// defer closeClients()
//
// // prepare and serve setup node.
// sn, closeSetup := prepSetupNode(clients[0])
// setupPK := sn.messenger.Local()
// defer closeSetup()
//
// // prepare loop data describing the loop that is to be closed.
// ld := routing.LoopData{
// Loop: routing.Loop{
// Local: routing.Addr{
// PubKey: clients[1].Local(),
// Port: 1,
// },
// Remote: routing.Addr{
// PubKey: clients[2].Local(),
// Port: 2,
// },
// },
// RouteID: 3,
// }
//
// iTpErrs := make(chan error, 3)
// var iTp transport.Transport
// go func() {
// tp, err := clients[1].Dial(context.TODO(), setupPK)
// iTp = tp
// iTpErrs <- err
// iTpErrs <- CloseLoop(NewSetupProtocol(tp), ld)
// iTpErrs <- iTp.Close()
// close(iTpErrs)
// }()
// defer func() {
// i := 0
// for err := range iTpErrs {
// require.NoError(t, err, i)
// i++
// }
// }()
//})
}
t.Run("CloseLoop", func(t *testing.T) {

// client index 0 is for setup node.
// clients index 1 and 2 are for visor nodes.
clients, closeClients := prepClients(3)
defer closeClients()

// prepare and serve setup node.
sn, closeSetup := prepSetupNode(clients[0])
setupPK := sn.messenger.Local()
defer closeSetup()

// prepare loop data describing the loop that is to be closed.
ld := routing.LoopData{
Loop: routing.Loop{
Local: routing.Addr{
PubKey: clients[1].Local(),
Port: 1,
},
Remote: routing.Addr{
PubKey: clients[2].Local(),
Port: 2,
},
},
RouteID: 3,
}

//func TestCloseLoop(t *testing.T) {
// dc := disc.NewMock()
//
// pk1, sk1 := cipher.GenerateKeyPair()
// pk3, sk3 := cipher.GenerateKeyPair()
//
// n3, srvErrCh := createServer(t, dc)
//
// time.Sleep(100 * time.Millisecond)
//
// c1 := dmsg.NewClient(pk1, sk1, dc)
// c3 := dmsg.NewClient(pk3, sk3, dc)
//
// require.NoError(t, c1.InitiateServerConnections(context.Background(), 1))
// require.NoError(t, c3.InitiateServerConnections(context.Background(), 1))
//
// sn := &Node{logging.MustGetLogger("setup_node"), c3, 0, metrics.NewDummy()}
// errChan := make(chan error)
// go func() {
// errChan <- sn.Serve(context.TODO())
// }()
//
// tr, err := c1.Dial(context.TODO(), pk3)
// require.NoError(t, err)
//
// proto := NewSetupProtocol(tr)
// require.NoError(t, CloseLoop(proto, routing.LoopData{
// Loop: routing.Loop{
// Remote: routing.Addr{
// PubKey: pk3,
// Port: 2,
// },
// Local: routing.Addr{
// Port: 1,
// },
// },
// }))
//
// require.NoError(t, sn.Close())
// require.NoError(t, <-errChan)
//
// require.NoError(t, n3.Close())
// require.NoError(t, errWithTimeout(srvErrCh))
//}
// client_1 initiates close loop with setup node.
iTp, err := clients[1].Dial(context.TODO(), setupPK)
require.NoError(t, err)
iTpErrs := make(chan error, 2)
go func() {
iTpErrs <- CloseLoop(NewSetupProtocol(iTp), ld)
iTpErrs <- iTp.Close()
close(iTpErrs)
}()
defer func() {
i := 0
for err := range iTpErrs {
require.NoError(t, err, i)
i++
}
}()

// client_2 accepts close request.
tp, err := clients[2].Accept(context.TODO())
require.NoError(t, err)
defer func() { require.NoError(t, tp.Close()) }()

proto := NewSetupProtocol(tp)

pt, pp, err := proto.ReadPacket()
require.NoError(t, err)
require.Equal(t, PacketLoopClosed, pt)

var d routing.LoopData
require.NoError(t, json.Unmarshal(pp, &d))
require.Equal(t, ld.Loop.Remote, d.Loop.Local)
require.Equal(t, ld.Loop.Local, d.Loop.Remote)

// TODO: This error is not checked due to a bug in dmsg.
_ = proto.WritePacket(RespSuccess, nil) //nolint:errcheck
})
}

func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) {
pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte("s"))
Expand Down
28 changes: 0 additions & 28 deletions pkg/setup/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ import (
"fmt"
"io"

"github.com/skycoin/dmsg/cipher"

"github.com/skycoin/skywire/pkg/transport/dmsg"

"github.com/skycoin/skywire/pkg/routing"
)

Expand Down Expand Up @@ -65,30 +61,6 @@ type Protocol struct {
rw io.ReadWriter
}

func (p *Protocol) pks() string {
humanize := func(pk cipher.PubKey) string {
switch pk.String() {
case "02d75d089b307032a3dfd1e6808f6a4ea0011205f60e7f28f69f444c4e48b2b7c3":
return "1"
case "0217fcf0478d41aa7eab8ae023658910e76e5aeae9bf18fdd607188c290a9be649":
return "2"
case "028e90b6fcb4ce3ecdf0fff555be992ab612d85c00897af8bdabff157916468fc1":
return "3"
case "03739ff49de06eab6b26f55e658c97121457bb690a1d8b55cd892decbc95073b80":
return "4"
case "0345327088f0f34c359ecf67892fbe58a1cd7299e15a797339fa29365cc2e7c551":
return "s"
default:
return fmt.Sprintf("(%s)", pk)
}
}
tp, ok := p.rw.(*dmsg.Transport)
if !ok {
return "[~~]"
}
return fmt.Sprintf("[%s%s]", humanize(tp.LocalPK()), humanize(tp.RemotePK()))
}

// NewSetupProtocol constructs a new setup Protocol.
func NewSetupProtocol(rw io.ReadWriter) *Protocol {
return &Protocol{rw}
Expand Down

0 comments on commit c522b90

Please sign in to comment.