From ed08a650dc25dc1b5bfc78fb7c630b387158287b Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Mon, 18 Nov 2019 18:20:46 +0300 Subject: [PATCH] Fix setup node test --- pkg/setup/idreservoir.go | 14 +- pkg/setup/node_test.go | 272 +++++++++++---------------------------- 2 files changed, 78 insertions(+), 208 deletions(-) diff --git a/pkg/setup/idreservoir.go b/pkg/setup/idreservoir.go index bdefa0c256..0f56c9e396 100644 --- a/pkg/setup/idreservoir.go +++ b/pkg/setup/idreservoir.go @@ -55,7 +55,7 @@ func (idr *idReservoir) ReserveIDs(ctx context.Context, log *logging.Logger, dms for pk, n := range idr.rec { pk, n := pk, n - /*go func() { + go func() { ids, err := reserve(ctx, log, dmsgC, pk, n) if err != nil { errCh <- fmt.Errorf("reserve routeID from %s failed: %v", pk, err) @@ -65,17 +65,7 @@ func (idr *idReservoir) ReserveIDs(ctx context.Context, log *logging.Logger, dms idr.ids[pk] = ids idr.mx.Unlock() errCh <- nil - }()*/ - fmt.Printf("Trying to ReserveIDs from %s\n", pk) - ids, err := reserve(ctx, log, dmsgC, pk, n) - if err != nil { - errCh <- fmt.Errorf("reserve routeID from %s failed: %v", pk, err) - continue - } - idr.mx.Lock() - idr.ids[pk] = ids - idr.mx.Unlock() - errCh <- nil + }() } return finalError(len(idr.rec), errCh) diff --git a/pkg/setup/node_test.go b/pkg/setup/node_test.go index e13d8fdf7f..0bd4482eae 100644 --- a/pkg/setup/node_test.go +++ b/pkg/setup/node_test.go @@ -12,23 +12,22 @@ import ( "testing" "time" + "github.com/SkycoinProject/skywire-mainnet/pkg/setup/setupclient" + "github.com/SkycoinProject/skywire-mainnet/internal/testhelpers" "github.com/SkycoinProject/skywire-mainnet/pkg/snet/snettest" "github.com/SkycoinProject/dmsg" "github.com/SkycoinProject/dmsg/cipher" - "github.com/SkycoinProject/dmsg/disc" "github.com/SkycoinProject/skycoin/src/util/logging" - "github.com/google/uuid" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "golang.org/x/net/nettest" - "github.com/SkycoinProject/skywire-mainnet/internal/skyenv" "github.com/SkycoinProject/skywire-mainnet/pkg/metrics" "github.com/SkycoinProject/skywire-mainnet/pkg/router" "github.com/SkycoinProject/skywire-mainnet/pkg/routing" + "github.com/google/uuid" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) func TestMain(m *testing.M) { @@ -46,42 +45,19 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -// TODO(Darkren): fix this test. Explanation below -// Test may finish in 3 different ways: -// 1. Pass -// 2. Fail -// 3. Hang -// Adding `time.Sleep` at the start of `Write` operation in the DMSG makes it less possible to hang -// From observations seems like something's wrong in the DMSG, probably writing right after `Dial/Accept` -// causes this. -// 1. Test has possibility to pass, this means the test itself is correct -// 2. Test failure always comes with unexpected `context deadline exceeded`. In `read` operation of -// `setup proto` we ensure additional timeout, that's where this error comes from. This fact proves that -// DMSG has a related bug -// 3. Hanging may be not the problem of the DMSG. Probably some of the communication part here is wrong. -// The reason I think so is that - if we ensure read timeouts, why doesn't this test constantly fail? -// Maybe some wrapper for DMSG is wrong, or some internal operations before the actual communication behave bad func TestNode(t *testing.T) { - // We are generating two key pairs - one for the a `Router`, the other to send packets to `Router`. + // We are generating five key pairs - one for the `Router` of setup node, + // the other ones - for the clients along the desired route. keys := snettest.GenKeyPairs(5) // create test env nEnv := snettest.NewEnv(t, keys) defer nEnv.Teardown() - // Prepare dmsg server. - server, serverErr := createServer(t, nEnv.DmsgD) - defer func() { - require.NoError(t, server.Close()) - require.NoError(t, errWithTimeout(serverErr)) - }() - type clientWithDMSGAddrAndListener struct { *dmsg.Client Addr dmsg.Addr Listener *dmsg.Listener - RPCServer *rpc.Server - Router router.Router AppliedIntermediaryRules []routing.Rule AppliedEdgeRules routing.EdgeRules } @@ -119,50 +95,37 @@ func TestNode(t *testing.T) { fmt.Printf("Client %d PK: %s\n", i, clients[i].Addr.PK) r := &router.MockRouter{} - // for intermediary nodes and the destination one - if i >= 1 { - // passing two rules to each node (forward and reverse routes) + // exclude setup node + if i > 0 { + idx := i + // passing two rules to each node (forward and reverse routes). Simulate + // applying intermediary rules. r.On("SaveRoutingRules", mock.Anything, mock.Anything). Return(func(rules ...routing.Rule) error { - clients[i].AppliedIntermediaryRules = append(clients[i].AppliedIntermediaryRules, rules...) + clients[idx].AppliedIntermediaryRules = append(clients[idx].AppliedIntermediaryRules, rules...) return nil }) + // simulate reserving IDs. r.On("ReserveKeys", 2).Return(reservedIDs, testhelpers.NoErr) + // destination node. Simulate applying edge rules. if i == (n - 1) { r.On("IntroduceRules", mock.Anything).Return(func(rules routing.EdgeRules) error { - clients[i].AppliedEdgeRules = rules + clients[idx].AppliedEdgeRules = rules return nil }) } - } - clients[i].Router = r - if i != 0 { - rpcGateway := router.NewRPCGateway(r) rpcServer := rpc.NewServer() - err = rpcServer.Register(rpcGateway) + err = rpcServer.Register(router.NewRPCGateway(r)) require.NoError(t, err) - clients[i].RPCServer = rpcServer - go func(idx int) { - for { - conn, err := listener.Accept() - if err != nil { - //fmt.Printf("Error accepting RPC conn: %v\n", err) - continue - } - - //fmt.Println("Accepted RPC conn") - go clients[idx].RPCServer.ServeConn(conn) - } - }(i) - //go clients[i].RPCServer.Accept(listener) + go rpcServer.Accept(listener) } } + return clients, func() { for _, c := range clients { - //require.NoError(t, c.Listener.Close()) require.NoError(t, c.Close()) } } @@ -186,6 +149,41 @@ func TestNode(t *testing.T) { } } + // generates forward and reverse routes for the bidirectional one. + generateForwardAndReverseRoutes := func(route routing.BidirectionalRoute) (routing.Route, routing.Route) { + forwardRoute := routing.Route{ + Desc: route.Desc, + Path: route.Forward, + KeepAlive: route.KeepAlive, + } + reverseRoute := routing.Route{ + Desc: route.Desc.Invert(), + Path: route.Reverse, + KeepAlive: route.KeepAlive, + } + + return forwardRoute, reverseRoute + } + + // generates wanted rules. + generateRules := func( + t *testing.T, + route routing.BidirectionalRoute, + reservedIDs []routing.RouteID, + ) (forwardRules, consumeRules map[cipher.PubKey]routing.Rule, intermediaryRules RulesMap) { + wantIDR, _ := newIDReservoir(route.Forward, route.Reverse) + for pk := range wantIDR.rec { + wantIDR.ids[pk] = reservedIDs + } + + forwardRoute, reverseRoute := generateForwardAndReverseRoutes(route) + + forwardRules, consumeRules, intermediaryRules, err := wantIDR.GenerateRules(forwardRoute, reverseRoute) + require.NoError(t, err) + + return forwardRules, consumeRules, intermediaryRules + } + // TEST: Emulates the communication between 4 visor nodes and a setup node, // where the first client node initiates a route to the last. t.Run("DialRouteGroup", func(t *testing.T) { @@ -195,11 +193,9 @@ func TestNode(t *testing.T) { defer closeClients() // prepare and serve setup node (using client 0). - sn, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) + _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) defer closeSetup() - //setupPK := clients[0].Addr.PK - // prepare loop creation (client_1 will use this to request loop creation with setup node). desc := routing.NewRouteDescriptor(clients[1].Addr.PK, clients[4].Addr.PK, 1, 1) @@ -221,156 +217,40 @@ func TestNode(t *testing.T) { Forward: forwardHops, Reverse: reverseHops, } - ///////////////////////////////////////////////////////////////// - /*reservoir, total := newIDReservoir(route.Forward, route.Reverse) - sn.logger.Infof("There are %d route IDs to reserve.", total) - - err := reservoir.ReserveIDs(ctx, sn.logger, sn.dmsgC, routerclient.ReserveIDs) - require.NoError(t, err) - sn.logger.Infof("Successfully reserved route IDs.")*/ - /////////////////////////////////////////////////////////////////////////// - //logger := logging.MustGetLogger("setup_client_test") + forwardRules, consumeRules, intermediaryRules := generateRules(t, route, reservedIDs) - //gotEdgeRules, err := setupclient.DialRouteGroup(ctx, logger, nEnv.Nets[1], []cipher.PubKey{setupPK}, route) - gotEdgeRules, err := sn.handleDialRouteGroup(context.Background(), route) - require.NoError(t, err) + forwardRoute, reverseRoute := generateForwardAndReverseRoutes(route) - wantEdgeRules := routing.EdgeRules{ // TODO: fill with correct values - Desc: desc, - Forward: nil, - Reverse: nil, + wantEdgeRules := routing.EdgeRules{ + Desc: forwardRoute.Desc, + Forward: forwardRules[route.Desc.SrcPK()], + Reverse: consumeRules[route.Desc.SrcPK()], } - require.Equal(t, wantEdgeRules, gotEdgeRules) - - /* - var addRuleDone sync.WaitGroup - var nextRouteID uint32 - // CLOSURE: emulates how a visor node should react when expecting an AddRules packet. - expectAddRules := func(client int, expRule routing.RuleType) { - conn, err := clients[client].Listener.Accept() - require.NoError(t, err) - - fmt.Printf("client %v:%v accepted\n", client, clients[client].Addr) - proto := NewSetupProtocol(conn) - - pt, _, err := proto.ReadPacket() - require.NoError(t, err) - require.Equal(t, PacketRequestRouteID, pt) - - fmt.Printf("client %v:%v got PacketRequestRouteID\n", client, clients[client].Addr) - - routeID := atomic.AddUint32(&nextRouteID, 1) - - // TODO: This error is not checked due to a bug in dmsg. - _ = proto.WritePacket(RespSuccess, []routing.RouteID{routing.RouteID(routeID)}) // nolint:errcheck - require.NoError(t, err) - - fmt.Printf("client %v:%v responded to with registration ID: %v\n", client, clients[client].Addr, routeID) - - require.NoError(t, conn.Close()) - - conn, err = clients[client].Listener.Accept() - require.NoError(t, err) - - fmt.Printf("client %v:%v accepted 2nd time\n", client, clients[client].Addr) - - proto = NewSetupProtocol(conn) - - pt, pp, err := proto.ReadPacket() - require.NoError(t, err) - require.Equal(t, PacketAddRules, pt) - - fmt.Printf("client %v:%v got PacketAddRules\n", client, clients[client].Addr) - - var rs []routing.Rule - require.NoError(t, json.Unmarshal(pp, &rs)) + gotEdgeRules, err := setupclient.DialRouteGroup(ctx, logging.MustGetLogger("setupclient_test"), nEnv.Nets[1], []cipher.PubKey{clients[0].Addr.PK}, route) + require.NoError(t, err) + require.Equal(t, wantEdgeRules, gotEdgeRules) - for _, r := range rs { - require.Equal(t, expRule, r.Type()) + for pk, rules := range intermediaryRules { + for _, cl := range clients { + if cl.Addr.PK == pk { + require.Equal(t, cl.AppliedIntermediaryRules, rules) + break } - - // TODO: This error is not checked due to a bug in dmsg. - err = proto.WritePacket(RespSuccess, nil) - _ = err - - fmt.Printf("client %v:%v responded for PacketAddRules\n", client, clients[client].Addr) - - require.NoError(t, conn.Close()) - - addRuleDone.Done() } + } - // CLOSURE: emulates how a visor node should react when expecting an OnConfirmLoop packet. - expectConfirmLoop := func(client int) { - tp, err := clients[client].Listener.AcceptTransport() - require.NoError(t, err) - - proto := NewSetupProtocol(tp) - - pt, pp, err := proto.ReadPacket() - require.NoError(t, err) - require.Equal(t, PacketConfirmLoop, pt) - - var d routing.LoopData - require.NoError(t, json.Unmarshal(pp, &d)) - - switch client { - case 1: - require.Equal(t, ld.Loop, d.Loop) - case 4: - require.Equal(t, ld.Loop.Local, d.Loop.Remote) - require.Equal(t, ld.Loop.Remote, d.Loop.Local) - default: - t.Fatalf("We shouldn't be receiving a OnConfirmLoop packet from client %d", client) - } - - // TODO: This error is not checked due to a bug in dmsg. - err = proto.WritePacket(RespSuccess, nil) - _ = err - - require.NoError(t, tp.Close()) - } + respRouteRules := routing.EdgeRules{ + Desc: reverseRoute.Desc, + Forward: forwardRules[route.Desc.DstPK()], + Reverse: consumeRules[route.Desc.DstPK()], + } - // since the route establishment is asynchronous, - // we must expect all the messages in parallel - addRuleDone.Add(4) - go expectAddRules(4, routing.RuleApp) - go expectAddRules(3, routing.RuleForward) - go expectAddRules(2, routing.RuleForward) - go expectAddRules(1, routing.RuleForward) - addRuleDone.Wait() - fmt.Println("FORWARD ROUTE DONE") - addRuleDone.Add(4) - go expectAddRules(1, routing.RuleApp) - go expectAddRules(2, routing.RuleForward) - go expectAddRules(3, routing.RuleForward) - go expectAddRules(4, routing.RuleForward) - addRuleDone.Wait() - fmt.Println("REVERSE ROUTE DONE") - expectConfirmLoop(1) - expectConfirmLoop(4) - - */ + require.Equal(t, respRouteRules, clients[4].AppliedEdgeRules) }) } -func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) { - pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte("s")) - require.NoError(t, err) - l, err := nettest.NewLocalListener("tcp") - require.NoError(t, err) - srv, err = dmsg.NewServer(pk, sk, "", l, dc) - require.NoError(t, err) - errCh := make(chan error, 1) - go func() { - errCh <- srv.Serve() - close(errCh) - }() - return srv, errCh -} - func errWithTimeout(ch <-chan error) error { select { case err := <-ch: