Skip to content

Commit

Permalink
Adjust setup node test to the new DMSG API
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Aug 18, 2019
1 parent a1d0c4e commit 0286073
Show file tree
Hide file tree
Showing 11 changed files with 672 additions and 348 deletions.
2 changes: 1 addition & 1 deletion pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (sn *Node) createRoute(ctx context.Context, expireAt time.Time, route routi

go func(idx int, pubKey cipher.PubKey, rule routing.Rule, regIDChIn <-chan routing.RouteID,
regIDChOut chan<- routing.RouteID) {
routeID, err := sn.addRule(ctx, pubKey, rule, regIDChIn, regIDChOut)
routeID, err := sn.setupRule(ctx, pubKey, rule, regIDChIn, regIDChOut)
if err != nil {
// filter out context cancellation errors
if err == context.Canceled {
Expand Down
77 changes: 53 additions & 24 deletions pkg/setup/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,27 @@ func TestNode(t *testing.T) {
require.NoError(t, errWithTimeout(serverErr))
}()

type clientWithDMSGAddr struct {
*dmsg.Client
Addr dmsg.Addr
}

// CLOSURE: sets up dmsg clients.
prepClients := func(n int) ([]*dmsg.Client, func()) {
clients := make([]*dmsg.Client, n)
prepClients := func(n int) ([]clientWithDMSGAddr, func()) {
clients := make([]clientWithDMSGAddr, n)
for i := 0; i < n; i++ {
pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte{byte(i)})
require.NoError(t, err)
t.Logf("client[%d] PK: %s\n", i, pk)
c := dmsg.NewClient(pk, sk, discovery, dmsg.SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s", i, pk))))
require.NoError(t, c.InitiateServerConnections(context.TODO(), 1))
clients[i] = c
clients[i] = clientWithDMSGAddr{
Client: c,
Addr: dmsg.Addr{
PK: pk,
Port: uint16(15678 + i),
},
}
}
return clients, func() {
for _, c := range clients {
Expand All @@ -71,10 +82,13 @@ func TestNode(t *testing.T) {
}

// CLOSURE: sets up setup node.
prepSetupNode := func(c *dmsg.Client) (*Node, func()) {
prepSetupNode := func(c *dmsg.Client, listeningPort uint16) (*Node, func()) {
listener, err := c.Listen(listeningPort)
require.NoError(t, err)
sn := &Node{
Logger: logging.MustGetLogger("setup_node"),
dmsgC: c,
dmsgL: listener,
metrics: metrics.NewDummy(),
}
go func() { _ = sn.Serve(context.TODO()) }() //nolint:errcheck
Expand All @@ -92,31 +106,32 @@ func TestNode(t *testing.T) {
defer closeClients()

// prepare and serve setup node (using client 0).
sn, closeSetup := prepSetupNode(clients[0])
setupPK := sn.dmsgC.Local()
_, closeSetup := prepSetupNode(clients[0].Client, clients[0].Addr.Port)
setupPK := clients[0].Addr.PK
setupPort := clients[0].Addr.Port
defer closeSetup()

// prepare loop creation (client_1 will use this to request loop creation with setup node).
ld := routing.LoopDescriptor{
Loop: routing.Loop{
Local: routing.Addr{PubKey: clients[1].Local(), Port: 1},
Remote: routing.Addr{PubKey: clients[4].Local(), Port: 1},
Local: routing.Addr{PubKey: clients[1].Addr.PK, Port: 1},
Remote: routing.Addr{PubKey: clients[4].Addr.PK, Port: 1},
},
Reverse: routing.Route{
&routing.Hop{From: clients[1].Local(), To: clients[2].Local(), Transport: uuid.New()},
&routing.Hop{From: clients[2].Local(), To: clients[3].Local(), Transport: uuid.New()},
&routing.Hop{From: clients[3].Local(), To: clients[4].Local(), Transport: uuid.New()},
&routing.Hop{From: clients[1].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()},
&routing.Hop{From: clients[2].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()},
&routing.Hop{From: clients[3].Addr.PK, To: clients[4].Addr.PK, Transport: uuid.New()},
},
Forward: routing.Route{
&routing.Hop{From: clients[4].Local(), To: clients[3].Local(), Transport: uuid.New()},
&routing.Hop{From: clients[3].Local(), To: clients[2].Local(), Transport: uuid.New()},
&routing.Hop{From: clients[2].Local(), To: clients[1].Local(), Transport: uuid.New()},
&routing.Hop{From: clients[4].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()},
&routing.Hop{From: clients[3].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()},
&routing.Hop{From: clients[2].Addr.PK, To: clients[1].Addr.PK, Transport: uuid.New()},
},
Expiry: time.Now().Add(time.Hour),
}

// client_1 initiates loop creation with setup node.
iTp, err := clients[1].Dial(context.TODO(), setupPK)
iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort)
require.NoError(t, err)
iTpErrs := make(chan error, 2)
go func() {
Expand All @@ -136,7 +151,10 @@ func TestNode(t *testing.T) {
var nextRouteID uint32
// CLOSURE: emulates how a visor node should react when expecting an AddRules packet.
expectAddRules := func(client int, expRule routing.RuleType) {
tp, err := clients[client].Accept(context.TODO())
listener, err := clients[client].Listen(clients[client].Addr.Port)
require.NoError(t, err)

tp, err := listener.AcceptTransport()
require.NoError(t, err)

proto := NewSetupProtocol(tp)
Expand Down Expand Up @@ -165,15 +183,19 @@ func TestNode(t *testing.T) {
_ = proto.WritePacket(RespSuccess, nil) //nolint:errcheck

require.NoError(t, tp.Close())
require.NoError(t, listener.Close())

addRuleDone.Done()
}

// CLOSURE: emulates how a visor node should react when expecting an OnConfirmLoop packet.
expectConfirmLoop := func(client int) {
tp, err := clients[client].Accept(context.TODO())
listener, err := clients[client].Listen(clients[client].Addr.Port)
require.NoError(t, err)
defer func() { require.NoError(t, listener.Close()) }()

tp, err := listener.AcceptTransport()
require.NoError(t, err)
defer func() { require.NoError(t, tp.Close()) }()

proto := NewSetupProtocol(tp)

Expand All @@ -196,6 +218,8 @@ func TestNode(t *testing.T) {

// TODO: This error is not checked due to a bug in dmsg.
_ = proto.WritePacket(RespSuccess, nil) //nolint:errcheck

require.NoError(t, tp.Close())
}

// since the route establishment is asynchronous,
Expand Down Expand Up @@ -227,27 +251,28 @@ func TestNode(t *testing.T) {
defer closeClients()

// prepare and serve setup node.
sn, closeSetup := prepSetupNode(clients[0])
setupPK := sn.dmsgC.Local()
_, closeSetup := prepSetupNode(clients[0].Client, clients[0].Addr.Port)
setupPK := clients[0].Addr.PK
setupPort := clients[0].Addr.Port
defer closeSetup()

// prepare loop data describing the loop that is to be closed.
ld := routing.LoopData{
Loop: routing.Loop{
Local: routing.Addr{
PubKey: clients[1].Local(),
PubKey: clients[1].Addr.PK,
Port: 1,
},
Remote: routing.Addr{
PubKey: clients[2].Local(),
PubKey: clients[2].Addr.PK,
Port: 2,
},
},
RouteID: 3,
}

// client_1 initiates close loop with setup node.
iTp, err := clients[1].Dial(context.TODO(), setupPK)
iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort)
require.NoError(t, err)
iTpErrs := make(chan error, 2)
go func() {
Expand All @@ -264,7 +289,11 @@ func TestNode(t *testing.T) {
}()

// client_2 accepts close request.
tp, err := clients[2].Accept(context.TODO())
listener, err := clients[2].Listen(clients[2].Addr.Port)
require.NoError(t, err)
defer func() { require.NoError(t, listener.Close()) }()

tp, err := listener.AcceptTransport()
require.NoError(t, err)
defer func() { require.NoError(t, tp.Close()) }()

Expand Down
26 changes: 26 additions & 0 deletions vendor/github.com/skycoin/dmsg/addr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0286073

Please sign in to comment.