Skip to content

Commit

Permalink
[WIP] Fixing setup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Nov 18, 2019
1 parent 76b7ffd commit 932e5d4
Showing 1 changed file with 155 additions and 144 deletions.
299 changes: 155 additions & 144 deletions pkg/setup/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ import (
"testing"
"time"

"github.com/SkycoinProject/skywire-mainnet/pkg/router/routerclient"
"github.com/SkycoinProject/skywire-mainnet/pkg/setup/setupclient"
"github.com/SkycoinProject/skywire-mainnet/pkg/snet/snettest"

"github.com/SkycoinProject/dmsg"
"github.com/SkycoinProject/dmsg/cipher"
"github.com/SkycoinProject/dmsg/disc"
"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/SkycoinProject/skywire-mainnet/internal/skyenv"
"github.com/SkycoinProject/skywire-mainnet/pkg/metrics"
"github.com/SkycoinProject/skywire-mainnet/pkg/router"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
"github.com/google/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/net/nettest"

"github.com/SkycoinProject/skywire-mainnet/internal/skyenv"
"github.com/SkycoinProject/skywire-mainnet/pkg/metrics"
"github.com/SkycoinProject/skywire-mainnet/pkg/router"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -59,11 +61,15 @@ func TestMain(m *testing.M) {
// The reason I think so is that - if we ensure read timeouts, why doesn't this test constantly fail?
// Maybe some wrapper for DMSG is wrong, or some internal operations before the actual communication behave bad
func TestNode(t *testing.T) {
// Prepare mock dmsg discovery.
discovery := disc.NewMock()
// We are generating two key pairs - one for the a `Router`, the other to send packets to `Router`.
keys := snettest.GenKeyPairs(5)

// create test env
nEnv := snettest.NewEnv(t, keys)
defer nEnv.Teardown()

// Prepare dmsg server.
server, serverErr := createServer(t, discovery)
server, serverErr := createServer(t, nEnv.DmsgD)
defer func() {
require.NoError(t, server.Close())
require.NoError(t, errWithTimeout(serverErr))
Expand All @@ -79,6 +85,8 @@ func TestNode(t *testing.T) {
AppliedEdgeRules routing.EdgeRules
}

ctx := context.TODO()

// CLOSURE: sets up dmsg clients.
prepClients := func(n int) ([]clientWithDMSGAddrAndListener, func()) {
clients := make([]clientWithDMSGAddrAndListener, n)
Expand All @@ -90,11 +98,10 @@ func TestNode(t *testing.T) {
} else {
port = skyenv.DmsgAwaitSetupPort
}
pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte{byte(i)})
require.NoError(t, err)
pk, sk := keys[i].PK, keys[i].SK
t.Logf("client[%d] PK: %s\n", i, pk)
c := dmsg.NewClient(pk, sk, discovery, dmsg.SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s:%d", i, pk, port))))
require.NoError(t, c.InitiateServerConnections(context.TODO(), 1))
c := dmsg.NewClient(pk, sk, nEnv.DmsgD, dmsg.SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s:%d", i, pk, port))))
require.NoError(t, c.InitiateServerConnections(ctx, 1))
listener, err := c.Listen(port)
require.NoError(t, err)
clients[i] = clientWithDMSGAddrAndListener{
Expand Down Expand Up @@ -173,160 +180,164 @@ func TestNode(t *testing.T) {
}

// TEST: Emulates the communication between 4 visor nodes and a setup node,
// where the first client node initiates a loop to the last.
t.Run("CreateRoutes", func(t *testing.T) {
// where the first client node initiates a route to the last.
t.Run("DialRouteGroup", func(t *testing.T) {
// client index 0 is for setup node.
// clients index 1 to 4 are for visor nodes.
clients, closeClients := prepClients(5)
defer closeClients()

_, err := routerclient.ReserveIDs(context.Background(), logging.MustGetLogger("dick"), clients[0].Client, clients[1].Addr.PK, 1)
require.NoError(t, err)
fmt.Println("Got IDs")

time.Sleep(1 * time.Hour)

// prepare and serve setup node (using client 0).
sn, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener)
//setupPK := clients[0].Addr.PK
//setupPort := clients[0].Addr.Port
_, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener)
defer closeSetup()

// client_1 initiates loop creation with setup node.
//iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort)
//require.NoError(t, err)
iTpErrs := make(chan error)
edgeRulesCh := make(chan routing.EdgeRules)
go func() {
fwdRule := routing.Path{
{
TpID: uuid.New(),
From: clients[1].Addr.PK,
To: clients[2].Addr.PK,
},
{
TpID: uuid.New(),
From: clients[2].Addr.PK,
To: clients[3].Addr.PK,
},
{
TpID: uuid.New(),
From: clients[3].Addr.PK,
To: clients[4].Addr.PK,
},
}
rvRule := routing.Path{
{
TpID: uuid.New(),
From: clients[4].Addr.PK,
To: clients[3].Addr.PK,
},
{
TpID: uuid.New(),
From: clients[3].Addr.PK,
To: clients[2].Addr.PK,
},
{
TpID: uuid.New(),
From: clients[2].Addr.PK,
To: clients[1].Addr.PK,
},
}
setupPK := clients[0].Addr.PK

ctx := context.Background()
desc := routing.NewRouteDescriptor(clients[1].Addr.PK, clients[4].Addr.PK, routing.Port(clients[1].Addr.Port), routing.Port(clients[4].Addr.Port))
route := routing.BidirectionalRoute{
Desc: desc,
KeepAlive: 1 * time.Hour,
Forward: fwdRule,
Reverse: rvRule,
}
// prepare loop creation (client_1 will use this to request loop creation with setup node).
desc := routing.NewRouteDescriptor(clients[1].Addr.PK, clients[4].Addr.PK, 1, 1)

rules, err := sn.handleDialRouteGroup(ctx, route)
iTpErrs <- err
close(iTpErrs)
edgeRulesCh <- rules
close(edgeRulesCh)
}()
defer func() {
i := 0
for err := range iTpErrs {
require.NoError(t, err, i)
i++
}
forwardHops := []routing.Hop{
{From: clients[1].Addr.PK, To: clients[2].Addr.PK, TpID: uuid.New()},
{From: clients[2].Addr.PK, To: clients[3].Addr.PK, TpID: uuid.New()},
{From: clients[3].Addr.PK, To: clients[4].Addr.PK, TpID: uuid.New()},
}

rules := <-edgeRulesCh
require.Equal(t, routing.NewRouteDescriptor(clients[1].Addr.PK, clients[4].Addr.PK, routing.Port(clients[1].Addr.Port), routing.Port(clients[4].Addr.Port)), rules.Desc)
}()
})
reverseHops := []routing.Hop{
{From: clients[4].Addr.PK, To: clients[3].Addr.PK, TpID: uuid.New()},
{From: clients[3].Addr.PK, To: clients[2].Addr.PK, TpID: uuid.New()},
{From: clients[2].Addr.PK, To: clients[1].Addr.PK, TpID: uuid.New()},
}

// TEST: Emulates the communication between 2 visor nodes and a setup nodes,
// where a route is already established,
// and the first client attempts to tear it down.
/*t.Run("CloseLoop", func(t *testing.T) {
// client index 0 is for setup node.
// clients index 1 and 2 are for visor nodes.
clients, closeClients := prepClients(3)
defer closeClients()
route := routing.BidirectionalRoute{
Desc: desc,
KeepAlive: 1 * time.Hour,
Forward: forwardHops,
Reverse: reverseHops,
}

// prepare and serve setup node.
_, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener)
setupPK := clients[0].Addr.PK
setupPort := clients[0].Addr.Port
defer closeSetup()
logger := logging.MustGetLogger("setup_client_test")

// prepare loop data describing the loop that is to be closed.
ld := routing.LoopData{
Loop: routing.Loop{
Local: routing.Addr{
PubKey: clients[1].Addr.PK,
Port: 1,
},
Remote: routing.Addr{
PubKey: clients[2].Addr.PK,
Port: 2,
},
},
RouteID: 3,
gotEdgeRules, err := setupclient.DialRouteGroup(ctx, logger, nEnv.Nets[1], []cipher.PubKey{setupPK}, route)
require.NoError(t, err)

wantEdgeRules := routing.EdgeRules{ // TODO: fill with correct values
Desc: desc,
Forward: nil,
Reverse: nil,
}
require.Equal(t, wantEdgeRules, gotEdgeRules)

/*
var addRuleDone sync.WaitGroup
var nextRouteID uint32
// CLOSURE: emulates how a visor node should react when expecting an AddRules packet.
expectAddRules := func(client int, expRule routing.RuleType) {
conn, err := clients[client].Listener.Accept()
require.NoError(t, err)
// client_1 initiates close loop with setup node.
iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort)
require.NoError(t, err)
iTpErrs := make(chan error, 2)
go func() {
iTpErrs <- CloseLoop(context.TODO(), NewSetupProtocol(iTp), ld)
iTpErrs <- iTp.Close()
close(iTpErrs)
}()
defer func() {
i := 0
for err := range iTpErrs {
require.NoError(t, err, i)
i++
fmt.Printf("client %v:%v accepted\n", client, clients[client].Addr)
proto := NewSetupProtocol(conn)
pt, _, err := proto.ReadPacket()
require.NoError(t, err)
require.Equal(t, PacketRequestRouteID, pt)
fmt.Printf("client %v:%v got PacketRequestRouteID\n", client, clients[client].Addr)
routeID := atomic.AddUint32(&nextRouteID, 1)
// TODO: This error is not checked due to a bug in dmsg.
_ = proto.WritePacket(RespSuccess, []routing.RouteID{routing.RouteID(routeID)}) // nolint:errcheck
require.NoError(t, err)
fmt.Printf("client %v:%v responded to with registration ID: %v\n", client, clients[client].Addr, routeID)
require.NoError(t, conn.Close())
conn, err = clients[client].Listener.Accept()
require.NoError(t, err)
fmt.Printf("client %v:%v accepted 2nd time\n", client, clients[client].Addr)
proto = NewSetupProtocol(conn)
pt, pp, err := proto.ReadPacket()
require.NoError(t, err)
require.Equal(t, PacketAddRules, pt)
fmt.Printf("client %v:%v got PacketAddRules\n", client, clients[client].Addr)
var rs []routing.Rule
require.NoError(t, json.Unmarshal(pp, &rs))
for _, r := range rs {
require.Equal(t, expRule, r.Type())
}
// TODO: This error is not checked due to a bug in dmsg.
err = proto.WritePacket(RespSuccess, nil)
_ = err
fmt.Printf("client %v:%v responded for PacketAddRules\n", client, clients[client].Addr)
require.NoError(t, conn.Close())
addRuleDone.Done()
}
}()
// client_2 accepts close request.
tp, err := clients[2].Listener.AcceptTransport()
require.NoError(t, err)
defer func() { require.NoError(t, tp.Close()) }()
// CLOSURE: emulates how a visor node should react when expecting an OnConfirmLoop packet.
expectConfirmLoop := func(client int) {
tp, err := clients[client].Listener.AcceptTransport()
require.NoError(t, err)
proto := NewSetupProtocol(tp)
proto := NewSetupProtocol(tp)
pt, pp, err := proto.ReadPacket()
require.NoError(t, err)
require.Equal(t, PacketLoopClosed, pt)
pt, pp, err := proto.ReadPacket()
require.NoError(t, err)
require.Equal(t, PacketConfirmLoop, pt)
var d routing.LoopData
require.NoError(t, json.Unmarshal(pp, &d))
switch client {
case 1:
require.Equal(t, ld.Loop, d.Loop)
case 4:
require.Equal(t, ld.Loop.Local, d.Loop.Remote)
require.Equal(t, ld.Loop.Remote, d.Loop.Local)
default:
t.Fatalf("We shouldn't be receiving a OnConfirmLoop packet from client %d", client)
}
// TODO: This error is not checked due to a bug in dmsg.
err = proto.WritePacket(RespSuccess, nil)
_ = err
var d routing.LoopData
require.NoError(t, json.Unmarshal(pp, &d))
require.Equal(t, ld.Loop.Remote, d.Loop.Local)
require.Equal(t, ld.Loop.Local, d.Loop.Remote)
require.NoError(t, tp.Close())
}
// TODO: This error is not checked due to a bug in dmsg.
err = proto.WritePacket(RespSuccess, nil)
_ = err
})*/
// since the route establishment is asynchronous,
// we must expect all the messages in parallel
addRuleDone.Add(4)
go expectAddRules(4, routing.RuleApp)
go expectAddRules(3, routing.RuleForward)
go expectAddRules(2, routing.RuleForward)
go expectAddRules(1, routing.RuleForward)
addRuleDone.Wait()
fmt.Println("FORWARD ROUTE DONE")
addRuleDone.Add(4)
go expectAddRules(1, routing.RuleApp)
go expectAddRules(2, routing.RuleForward)
go expectAddRules(3, routing.RuleForward)
go expectAddRules(4, routing.RuleForward)
addRuleDone.Wait()
fmt.Println("REVERSE ROUTE DONE")
expectConfirmLoop(1)
expectConfirmLoop(4)
*/
})
}

func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) {
Expand Down

0 comments on commit 932e5d4

Please sign in to comment.