From c522b902ae2ece5fd947288dd030c4b6349c6be9 Mon Sep 17 00:00:00 2001 From: Evan Lin Date: Mon, 29 Jul 2019 23:07:22 +0800 Subject: [PATCH] Fix setup.TestCloseLoop. --- pkg/app/packet_test.go | 2 +- pkg/setup/node.go | 12 +--- pkg/setup/node_test.go | 160 +++++++++++++++++------------------------ pkg/setup/protocol.go | 28 -------- 4 files changed, 70 insertions(+), 132 deletions(-) diff --git a/pkg/app/packet_test.go b/pkg/app/packet_test.go index 386e4fdeef..7990a166b0 100644 --- a/pkg/app/packet_test.go +++ b/pkg/app/packet_test.go @@ -20,5 +20,5 @@ func ExamplePacket() { // Output: skywire // 000000000000000000000000000000000000000000000000000000000000000000:0 - // :0 <-> 000000000000000000000000000000000000000000000000000000000000000000:0 + // 000000000000000000000000000000000000000000000000000000000000000000:0 <-> 000000000000000000000000000000000000000000000000000000000000000000:0 } diff --git a/pkg/setup/node.go b/pkg/setup/node.go index eaa9f6253b..ee6609cd6f 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -93,19 +93,13 @@ func (sn *Node) serveTransport(tr transport.Transport) error { case PacketCloseLoop: var ld routing.LoopData if err = json.Unmarshal(data, &ld); err == nil { - remote, ok := sn.remote(tr.Edges()) - if !ok { + if _, ok := sn.remote(tr.Edges()); !ok { return errors.New("configured PubKey not found in edges") } err = sn.closeLoop(ld.Loop.Remote.PubKey, routing.LoopData{ Loop: routing.Loop{ - Remote: routing.Addr{ - PubKey: remote, - Port: ld.Loop.Local.Port, - }, - Local: routing.Addr{ - Port: ld.Loop.Remote.Port, - }, + Remote: ld.Loop.Local, + Local: ld.Loop.Remote, }, }) } diff --git a/pkg/setup/node_test.go b/pkg/setup/node_test.go index 63c147f2f4..930b21f74d 100644 --- a/pkg/setup/node_test.go +++ b/pkg/setup/node_test.go @@ -56,7 +56,7 @@ func TestNode(t *testing.T) { for i := 0; i < n; i++ { pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte{byte(i)}) require.NoError(t, err) - t.Logf("> client[%d] PK: %s\n", i, pk) + t.Logf("client[%d] PK: %s\n", i, pk) c := dmsg.NewClient(pk, sk, discovery, dmsg.SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s", i, pk)))) require.NoError(t, c.InitiateServerConnections(context.TODO(), 1)) clients[i] = c @@ -153,7 +153,7 @@ func TestNode(t *testing.T) { } // TODO: This error is not checked due to a bug in dmsg. - _ = proto.WritePacket(RespSuccess, rIDs) + _ = proto.WritePacket(RespSuccess, rIDs) //nolint:errcheck } // CLOSURE: emulates how a visor node should react when expecting an ConfirmLoop packet. @@ -182,7 +182,7 @@ func TestNode(t *testing.T) { } // TODO: This error is not checked due to a bug in dmsg. - _ = proto.WritePacket(RespSuccess, nil) + _ = proto.WritePacket(RespSuccess, nil) //nolint:errcheck } expectAddRules(4, routing.RuleApp) @@ -200,98 +200,70 @@ func TestNode(t *testing.T) { // TEST: Emulates the communication between 2 visor nodes and a setup nodes, // where a route is already established, // and the first client attempts to tear it down. - //t.Run("CloseLoop", func(t *testing.T) { - // t.SkipNow() - // - // // clients index 0 and 1 are for visor nodes. - // // clients index 2 is for setup node. - // clients, closeClients := prepClients(3) - // defer closeClients() - // - // // prepare and serve setup node. - // sn, closeSetup := prepSetupNode(clients[0]) - // setupPK := sn.messenger.Local() - // defer closeSetup() - // - // // prepare loop data describing the loop that is to be closed. - // ld := routing.LoopData{ - // Loop: routing.Loop{ - // Local: routing.Addr{ - // PubKey: clients[1].Local(), - // Port: 1, - // }, - // Remote: routing.Addr{ - // PubKey: clients[2].Local(), - // Port: 2, - // }, - // }, - // RouteID: 3, - // } - // - // iTpErrs := make(chan error, 3) - // var iTp transport.Transport - // go func() { - // tp, err := clients[1].Dial(context.TODO(), setupPK) - // iTp = tp - // iTpErrs <- err - // iTpErrs <- CloseLoop(NewSetupProtocol(tp), ld) - // iTpErrs <- iTp.Close() - // close(iTpErrs) - // }() - // defer func() { - // i := 0 - // for err := range iTpErrs { - // require.NoError(t, err, i) - // i++ - // } - // }() - //}) -} + t.Run("CloseLoop", func(t *testing.T) { + + // client index 0 is for setup node. + // clients index 1 and 2 are for visor nodes. + clients, closeClients := prepClients(3) + defer closeClients() + + // prepare and serve setup node. + sn, closeSetup := prepSetupNode(clients[0]) + setupPK := sn.messenger.Local() + defer closeSetup() + + // prepare loop data describing the loop that is to be closed. + ld := routing.LoopData{ + Loop: routing.Loop{ + Local: routing.Addr{ + PubKey: clients[1].Local(), + Port: 1, + }, + Remote: routing.Addr{ + PubKey: clients[2].Local(), + Port: 2, + }, + }, + RouteID: 3, + } -//func TestCloseLoop(t *testing.T) { -// dc := disc.NewMock() -// -// pk1, sk1 := cipher.GenerateKeyPair() -// pk3, sk3 := cipher.GenerateKeyPair() -// -// n3, srvErrCh := createServer(t, dc) -// -// time.Sleep(100 * time.Millisecond) -// -// c1 := dmsg.NewClient(pk1, sk1, dc) -// c3 := dmsg.NewClient(pk3, sk3, dc) -// -// require.NoError(t, c1.InitiateServerConnections(context.Background(), 1)) -// require.NoError(t, c3.InitiateServerConnections(context.Background(), 1)) -// -// sn := &Node{logging.MustGetLogger("setup_node"), c3, 0, metrics.NewDummy()} -// errChan := make(chan error) -// go func() { -// errChan <- sn.Serve(context.TODO()) -// }() -// -// tr, err := c1.Dial(context.TODO(), pk3) -// require.NoError(t, err) -// -// proto := NewSetupProtocol(tr) -// require.NoError(t, CloseLoop(proto, routing.LoopData{ -// Loop: routing.Loop{ -// Remote: routing.Addr{ -// PubKey: pk3, -// Port: 2, -// }, -// Local: routing.Addr{ -// Port: 1, -// }, -// }, -// })) -// -// require.NoError(t, sn.Close()) -// require.NoError(t, <-errChan) -// -// require.NoError(t, n3.Close()) -// require.NoError(t, errWithTimeout(srvErrCh)) -//} + // client_1 initiates close loop with setup node. + iTp, err := clients[1].Dial(context.TODO(), setupPK) + require.NoError(t, err) + iTpErrs := make(chan error, 2) + go func() { + iTpErrs <- CloseLoop(NewSetupProtocol(iTp), ld) + iTpErrs <- iTp.Close() + close(iTpErrs) + }() + defer func() { + i := 0 + for err := range iTpErrs { + require.NoError(t, err, i) + i++ + } + }() + + // client_2 accepts close request. + tp, err := clients[2].Accept(context.TODO()) + require.NoError(t, err) + defer func() { require.NoError(t, tp.Close()) }() + + proto := NewSetupProtocol(tp) + + pt, pp, err := proto.ReadPacket() + require.NoError(t, err) + require.Equal(t, PacketLoopClosed, pt) + + var d routing.LoopData + require.NoError(t, json.Unmarshal(pp, &d)) + require.Equal(t, ld.Loop.Remote, d.Loop.Local) + require.Equal(t, ld.Loop.Local, d.Loop.Remote) + + // TODO: This error is not checked due to a bug in dmsg. + _ = proto.WritePacket(RespSuccess, nil) //nolint:errcheck + }) +} func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) { pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte("s")) diff --git a/pkg/setup/protocol.go b/pkg/setup/protocol.go index 8a748206af..648d95c735 100644 --- a/pkg/setup/protocol.go +++ b/pkg/setup/protocol.go @@ -8,10 +8,6 @@ import ( "fmt" "io" - "github.com/skycoin/dmsg/cipher" - - "github.com/skycoin/skywire/pkg/transport/dmsg" - "github.com/skycoin/skywire/pkg/routing" ) @@ -65,30 +61,6 @@ type Protocol struct { rw io.ReadWriter } -func (p *Protocol) pks() string { - humanize := func(pk cipher.PubKey) string { - switch pk.String() { - case "02d75d089b307032a3dfd1e6808f6a4ea0011205f60e7f28f69f444c4e48b2b7c3": - return "1" - case "0217fcf0478d41aa7eab8ae023658910e76e5aeae9bf18fdd607188c290a9be649": - return "2" - case "028e90b6fcb4ce3ecdf0fff555be992ab612d85c00897af8bdabff157916468fc1": - return "3" - case "03739ff49de06eab6b26f55e658c97121457bb690a1d8b55cd892decbc95073b80": - return "4" - case "0345327088f0f34c359ecf67892fbe58a1cd7299e15a797339fa29365cc2e7c551": - return "s" - default: - return fmt.Sprintf("(%s)", pk) - } - } - tp, ok := p.rw.(*dmsg.Transport) - if !ok { - return "[~~]" - } - return fmt.Sprintf("[%s%s]", humanize(tp.LocalPK()), humanize(tp.RemotePK())) -} - // NewSetupProtocol constructs a new setup Protocol. func NewSetupProtocol(rw io.ReadWriter) *Protocol { return &Protocol{rw}