Skip to content

Commit

Permalink
Fix setup node test
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Nov 18, 2019
1 parent acf0742 commit ed08a65
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 208 deletions.
14 changes: 2 additions & 12 deletions pkg/setup/idreservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (idr *idReservoir) ReserveIDs(ctx context.Context, log *logging.Logger, dms

for pk, n := range idr.rec {
pk, n := pk, n
/*go func() {
go func() {
ids, err := reserve(ctx, log, dmsgC, pk, n)
if err != nil {
errCh <- fmt.Errorf("reserve routeID from %s failed: %v", pk, err)
Expand All @@ -65,17 +65,7 @@ func (idr *idReservoir) ReserveIDs(ctx context.Context, log *logging.Logger, dms
idr.ids[pk] = ids
idr.mx.Unlock()
errCh <- nil
}()*/
fmt.Printf("Trying to ReserveIDs from %s\n", pk)
ids, err := reserve(ctx, log, dmsgC, pk, n)
if err != nil {
errCh <- fmt.Errorf("reserve routeID from %s failed: %v", pk, err)
continue
}
idr.mx.Lock()
idr.ids[pk] = ids
idr.mx.Unlock()
errCh <- nil
}()
}

return finalError(len(idr.rec), errCh)
Expand Down
272 changes: 76 additions & 196 deletions pkg/setup/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,22 @@ import (
"testing"
"time"

"github.com/SkycoinProject/skywire-mainnet/pkg/setup/setupclient"

"github.com/SkycoinProject/skywire-mainnet/internal/testhelpers"

"github.com/SkycoinProject/skywire-mainnet/pkg/snet/snettest"

"github.com/SkycoinProject/dmsg"
"github.com/SkycoinProject/dmsg/cipher"
"github.com/SkycoinProject/dmsg/disc"
"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/google/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/net/nettest"

"github.com/SkycoinProject/skywire-mainnet/internal/skyenv"
"github.com/SkycoinProject/skywire-mainnet/pkg/metrics"
"github.com/SkycoinProject/skywire-mainnet/pkg/router"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
"github.com/google/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestMain(m *testing.M) {
Expand All @@ -46,42 +45,19 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

// TODO(Darkren): fix this test. Explanation below
// Test may finish in 3 different ways:
// 1. Pass
// 2. Fail
// 3. Hang
// Adding `time.Sleep` at the start of `Write` operation in the DMSG makes it less possible to hang
// From observations seems like something's wrong in the DMSG, probably writing right after `Dial/Accept`
// causes this.
// 1. Test has possibility to pass, this means the test itself is correct
// 2. Test failure always comes with unexpected `context deadline exceeded`. In `read` operation of
// `setup proto` we ensure additional timeout, that's where this error comes from. This fact proves that
// DMSG has a related bug
// 3. Hanging may be not the problem of the DMSG. Probably some of the communication part here is wrong.
// The reason I think so is that - if we ensure read timeouts, why doesn't this test constantly fail?
// Maybe some wrapper for DMSG is wrong, or some internal operations before the actual communication behave bad
func TestNode(t *testing.T) {
// We are generating two key pairs - one for the a `Router`, the other to send packets to `Router`.
// We are generating five key pairs - one for the `Router` of setup node,
// the other ones - for the clients along the desired route.
keys := snettest.GenKeyPairs(5)

// create test env
nEnv := snettest.NewEnv(t, keys)
defer nEnv.Teardown()

// Prepare dmsg server.
server, serverErr := createServer(t, nEnv.DmsgD)
defer func() {
require.NoError(t, server.Close())
require.NoError(t, errWithTimeout(serverErr))
}()

type clientWithDMSGAddrAndListener struct {
*dmsg.Client
Addr dmsg.Addr
Listener *dmsg.Listener
RPCServer *rpc.Server
Router router.Router
AppliedIntermediaryRules []routing.Rule
AppliedEdgeRules routing.EdgeRules
}
Expand Down Expand Up @@ -119,50 +95,37 @@ func TestNode(t *testing.T) {
fmt.Printf("Client %d PK: %s\n", i, clients[i].Addr.PK)

r := &router.MockRouter{}
// for intermediary nodes and the destination one
if i >= 1 {
// passing two rules to each node (forward and reverse routes)
// exclude setup node
if i > 0 {
idx := i
// passing two rules to each node (forward and reverse routes). Simulate
// applying intermediary rules.
r.On("SaveRoutingRules", mock.Anything, mock.Anything).
Return(func(rules ...routing.Rule) error {
clients[i].AppliedIntermediaryRules = append(clients[i].AppliedIntermediaryRules, rules...)
clients[idx].AppliedIntermediaryRules = append(clients[idx].AppliedIntermediaryRules, rules...)
return nil
})

// simulate reserving IDs.
r.On("ReserveKeys", 2).Return(reservedIDs, testhelpers.NoErr)

// destination node. Simulate applying edge rules.
if i == (n - 1) {
r.On("IntroduceRules", mock.Anything).Return(func(rules routing.EdgeRules) error {
clients[i].AppliedEdgeRules = rules
clients[idx].AppliedEdgeRules = rules
return nil
})
}
}

clients[i].Router = r
if i != 0 {
rpcGateway := router.NewRPCGateway(r)
rpcServer := rpc.NewServer()
err = rpcServer.Register(rpcGateway)
err = rpcServer.Register(router.NewRPCGateway(r))
require.NoError(t, err)
clients[i].RPCServer = rpcServer
go func(idx int) {
for {
conn, err := listener.Accept()
if err != nil {
//fmt.Printf("Error accepting RPC conn: %v\n", err)
continue
}

//fmt.Println("Accepted RPC conn")
go clients[idx].RPCServer.ServeConn(conn)
}
}(i)
//go clients[i].RPCServer.Accept(listener)
go rpcServer.Accept(listener)
}
}

return clients, func() {
for _, c := range clients {
//require.NoError(t, c.Listener.Close())
require.NoError(t, c.Close())
}
}
Expand All @@ -186,6 +149,41 @@ func TestNode(t *testing.T) {
}
}

// generates forward and reverse routes for the bidirectional one.
generateForwardAndReverseRoutes := func(route routing.BidirectionalRoute) (routing.Route, routing.Route) {
forwardRoute := routing.Route{
Desc: route.Desc,
Path: route.Forward,
KeepAlive: route.KeepAlive,
}
reverseRoute := routing.Route{
Desc: route.Desc.Invert(),
Path: route.Reverse,
KeepAlive: route.KeepAlive,
}

return forwardRoute, reverseRoute
}

// generates wanted rules.
generateRules := func(
t *testing.T,
route routing.BidirectionalRoute,
reservedIDs []routing.RouteID,
) (forwardRules, consumeRules map[cipher.PubKey]routing.Rule, intermediaryRules RulesMap) {
wantIDR, _ := newIDReservoir(route.Forward, route.Reverse)
for pk := range wantIDR.rec {
wantIDR.ids[pk] = reservedIDs
}

forwardRoute, reverseRoute := generateForwardAndReverseRoutes(route)

forwardRules, consumeRules, intermediaryRules, err := wantIDR.GenerateRules(forwardRoute, reverseRoute)
require.NoError(t, err)

return forwardRules, consumeRules, intermediaryRules
}

// TEST: Emulates the communication between 4 visor nodes and a setup node,
// where the first client node initiates a route to the last.
t.Run("DialRouteGroup", func(t *testing.T) {
Expand All @@ -195,11 +193,9 @@ func TestNode(t *testing.T) {
defer closeClients()

// prepare and serve setup node (using client 0).
sn, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener)
_, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener)
defer closeSetup()

//setupPK := clients[0].Addr.PK

// prepare loop creation (client_1 will use this to request loop creation with setup node).
desc := routing.NewRouteDescriptor(clients[1].Addr.PK, clients[4].Addr.PK, 1, 1)

Expand All @@ -221,156 +217,40 @@ func TestNode(t *testing.T) {
Forward: forwardHops,
Reverse: reverseHops,
}
/////////////////////////////////////////////////////////////////
/*reservoir, total := newIDReservoir(route.Forward, route.Reverse)
sn.logger.Infof("There are %d route IDs to reserve.", total)
err := reservoir.ReserveIDs(ctx, sn.logger, sn.dmsgC, routerclient.ReserveIDs)
require.NoError(t, err)

sn.logger.Infof("Successfully reserved route IDs.")*/
///////////////////////////////////////////////////////////////////////////
//logger := logging.MustGetLogger("setup_client_test")
forwardRules, consumeRules, intermediaryRules := generateRules(t, route, reservedIDs)

//gotEdgeRules, err := setupclient.DialRouteGroup(ctx, logger, nEnv.Nets[1], []cipher.PubKey{setupPK}, route)
gotEdgeRules, err := sn.handleDialRouteGroup(context.Background(), route)
require.NoError(t, err)
forwardRoute, reverseRoute := generateForwardAndReverseRoutes(route)

wantEdgeRules := routing.EdgeRules{ // TODO: fill with correct values
Desc: desc,
Forward: nil,
Reverse: nil,
wantEdgeRules := routing.EdgeRules{
Desc: forwardRoute.Desc,
Forward: forwardRules[route.Desc.SrcPK()],
Reverse: consumeRules[route.Desc.SrcPK()],
}
require.Equal(t, wantEdgeRules, gotEdgeRules)

/*
var addRuleDone sync.WaitGroup
var nextRouteID uint32
// CLOSURE: emulates how a visor node should react when expecting an AddRules packet.
expectAddRules := func(client int, expRule routing.RuleType) {
conn, err := clients[client].Listener.Accept()
require.NoError(t, err)
fmt.Printf("client %v:%v accepted\n", client, clients[client].Addr)

proto := NewSetupProtocol(conn)
pt, _, err := proto.ReadPacket()
require.NoError(t, err)
require.Equal(t, PacketRequestRouteID, pt)
fmt.Printf("client %v:%v got PacketRequestRouteID\n", client, clients[client].Addr)
routeID := atomic.AddUint32(&nextRouteID, 1)
// TODO: This error is not checked due to a bug in dmsg.
_ = proto.WritePacket(RespSuccess, []routing.RouteID{routing.RouteID(routeID)}) // nolint:errcheck
require.NoError(t, err)
fmt.Printf("client %v:%v responded to with registration ID: %v\n", client, clients[client].Addr, routeID)
require.NoError(t, conn.Close())
conn, err = clients[client].Listener.Accept()
require.NoError(t, err)
fmt.Printf("client %v:%v accepted 2nd time\n", client, clients[client].Addr)
proto = NewSetupProtocol(conn)
pt, pp, err := proto.ReadPacket()
require.NoError(t, err)
require.Equal(t, PacketAddRules, pt)
fmt.Printf("client %v:%v got PacketAddRules\n", client, clients[client].Addr)
var rs []routing.Rule
require.NoError(t, json.Unmarshal(pp, &rs))
gotEdgeRules, err := setupclient.DialRouteGroup(ctx, logging.MustGetLogger("setupclient_test"), nEnv.Nets[1], []cipher.PubKey{clients[0].Addr.PK}, route)
require.NoError(t, err)
require.Equal(t, wantEdgeRules, gotEdgeRules)

for _, r := range rs {
require.Equal(t, expRule, r.Type())
for pk, rules := range intermediaryRules {
for _, cl := range clients {
if cl.Addr.PK == pk {
require.Equal(t, cl.AppliedIntermediaryRules, rules)
break
}
// TODO: This error is not checked due to a bug in dmsg.
err = proto.WritePacket(RespSuccess, nil)
_ = err
fmt.Printf("client %v:%v responded for PacketAddRules\n", client, clients[client].Addr)
require.NoError(t, conn.Close())
addRuleDone.Done()
}
}

// CLOSURE: emulates how a visor node should react when expecting an OnConfirmLoop packet.
expectConfirmLoop := func(client int) {
tp, err := clients[client].Listener.AcceptTransport()
require.NoError(t, err)
proto := NewSetupProtocol(tp)
pt, pp, err := proto.ReadPacket()
require.NoError(t, err)
require.Equal(t, PacketConfirmLoop, pt)
var d routing.LoopData
require.NoError(t, json.Unmarshal(pp, &d))
switch client {
case 1:
require.Equal(t, ld.Loop, d.Loop)
case 4:
require.Equal(t, ld.Loop.Local, d.Loop.Remote)
require.Equal(t, ld.Loop.Remote, d.Loop.Local)
default:
t.Fatalf("We shouldn't be receiving a OnConfirmLoop packet from client %d", client)
}
// TODO: This error is not checked due to a bug in dmsg.
err = proto.WritePacket(RespSuccess, nil)
_ = err
require.NoError(t, tp.Close())
}
respRouteRules := routing.EdgeRules{
Desc: reverseRoute.Desc,
Forward: forwardRules[route.Desc.DstPK()],
Reverse: consumeRules[route.Desc.DstPK()],
}

// since the route establishment is asynchronous,
// we must expect all the messages in parallel
addRuleDone.Add(4)
go expectAddRules(4, routing.RuleApp)
go expectAddRules(3, routing.RuleForward)
go expectAddRules(2, routing.RuleForward)
go expectAddRules(1, routing.RuleForward)
addRuleDone.Wait()
fmt.Println("FORWARD ROUTE DONE")
addRuleDone.Add(4)
go expectAddRules(1, routing.RuleApp)
go expectAddRules(2, routing.RuleForward)
go expectAddRules(3, routing.RuleForward)
go expectAddRules(4, routing.RuleForward)
addRuleDone.Wait()
fmt.Println("REVERSE ROUTE DONE")
expectConfirmLoop(1)
expectConfirmLoop(4)
*/
require.Equal(t, respRouteRules, clients[4].AppliedEdgeRules)
})
}

func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) {
pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte("s"))
require.NoError(t, err)
l, err := nettest.NewLocalListener("tcp")
require.NoError(t, err)
srv, err = dmsg.NewServer(pk, sk, "", l, dc)
require.NoError(t, err)
errCh := make(chan error, 1)
go func() {
errCh <- srv.Serve()
close(errCh)
}()
return srv, errCh
}

func errWithTimeout(ch <-chan error) error {
select {
case err := <-ch:
Expand Down

0 comments on commit ed08a65

Please sign in to comment.