diff --git a/cmd/skywire-cli/commands/node/routes.go b/cmd/skywire-cli/commands/node/routes.go index 5dd99431f0..0598cb35ab 100644 --- a/cmd/skywire-cli/commands/node/routes.go +++ b/cmd/skywire-cli/commands/node/routes.go @@ -100,7 +100,7 @@ var addRuleCmd = &cobra.Command{ remotePort = routing.Port(parseUint("remote-port", args[3], 16)) localPort = routing.Port(parseUint("local-port", args[4], 16)) ) - rule = routing.AppRule(keepAlive, routeID, remotePK, remotePort, localPort, 0) + rule = routing.AppRule(keepAlive, 0, routeID, remotePK, localPort, remotePort) case "fwd": var ( nextRouteID = routing.RouteID(parseUint("next-route-id", args[1], 32)) diff --git a/pkg/app/app.go b/pkg/app/app.go index ac67f1030f..16ec17f114 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -151,7 +151,9 @@ func (app *App) Close() error { // Accept awaits for incoming loop confirmation request from a Node and // returns net.Conn for received loop. func (app *App) Accept() (net.Conn, error) { + fmt.Println("!!! [ACCEPT] start !!!") addrs := <-app.acceptChan + fmt.Println("!!! [ACCEPT] read from ch !!!") laddr := addrs[0] raddr := addrs[1] @@ -187,6 +189,7 @@ func (app *App) Addr() net.Addr { func (app *App) handleProto() { err := app.proto.Serve(func(frame Frame, payload []byte) (res interface{}, err error) { + fmt.Printf("!!! app received frame: %s\n", frame) switch frame { case FrameConfirmLoop: err = app.confirmLoop(payload) @@ -242,6 +245,8 @@ func (app *App) forwardPacket(data []byte) error { return err } + fmt.Printf("!!! packet loop: %s\n", packet.Loop) + app.mu.Lock() conn := app.conns[packet.Loop] app.mu.Unlock() @@ -272,6 +277,7 @@ func (app *App) closeConn(data []byte) error { } func (app *App) confirmLoop(data []byte) error { + fmt.Println("!!! [confirmLoop] !!!") var addrs [2]routing.Addr if err := json.Unmarshal(data, &addrs); err != nil { return err @@ -288,6 +294,7 @@ func (app *App) confirmLoop(data []byte) error { return errors.New("loop is already created") } + fmt.Println("!!! [confirmLoop] selecting !!!") select { case app.acceptChan <- addrs: default: diff --git a/pkg/router/route_manager_test.go b/pkg/router/route_manager_test.go index e707199099..f40bc5e246 100644 --- a/pkg/router/route_manager_test.go +++ b/pkg/router/route_manager_test.go @@ -71,7 +71,7 @@ func TestNewRouteManager(t *testing.T) { defer clearRules() pk, _ := cipher.GenerateKeyPair() - rule := routing.AppRule(10*time.Minute, 3, pk, 3, 2, 1) + rule := routing.AppRule(10*time.Minute, 1, 3, pk, 2, 3) _, err := rt.AddRule(rule) require.NoError(t, err) @@ -102,6 +102,8 @@ func TestNewRouteManager(t *testing.T) { errCh <- rm.handleSetupConn(delOut) // Receive DeleteRule request. close(errCh) }() + + // TODO: remove defer from for loop defer func() { require.NoError(t, requestIDIn.Close()) require.NoError(t, addIn.Close()) @@ -190,7 +192,7 @@ func TestNewRouteManager(t *testing.T) { proto := setup.NewSetupProtocol(in) pk, _ := cipher.GenerateKeyPair() - rule := routing.AppRule(10*time.Minute, 3, pk, 3, 2, 2) + rule := routing.AppRule(10*time.Minute, 2, 3, pk, 2, 3) require.NoError(t, rt.SetRule(2, rule)) rule = routing.ForwardRule(10*time.Minute, 3, uuid.New(), 1) @@ -242,7 +244,7 @@ func TestNewRouteManager(t *testing.T) { proto := setup.NewSetupProtocol(in) pk, _ := cipher.GenerateKeyPair() - rule := routing.AppRule(10*time.Minute, 3, pk, 3, 2, 0) + rule := routing.AppRule(10*time.Minute, 0, 3, pk, 2, 3) require.NoError(t, rt.SetRule(2, rule)) rule = routing.ForwardRule(10*time.Minute, 3, uuid.New(), 1) diff --git a/pkg/router/router.go b/pkg/router/router.go index 00fa525cce..9b49339fe2 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -147,6 +147,8 @@ func (r *Router) handlePacket(ctx context.Context, packet routing.Packet) error // ServeApp handles App packets from the App connection on provided port. func (r *Router) ServeApp(conn net.Conn, port routing.Port, appConf *app.Config) error { + fmt.Println("!!! [ServeApp] start !!!") + r.wg.Add(1) defer r.wg.Done() @@ -229,7 +231,7 @@ func (r *Router) consumePacket(payload []byte, rule routing.Rule) error { } fmt.Println("got it!") if err := b.conn.Send(app.FrameSend, p, nil); err != nil { // TODO: Stuck here. - fmt.Println("err:", err) + fmt.Println("!!! Send err:", err) return err } fmt.Println("done") diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index ca7fec35ae..7e7501047a 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + routeFinder "github.com/skycoin/skywire/pkg/route-finder/client" "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/snet" "github.com/skycoin/skywire/pkg/snet/snettest" @@ -36,7 +37,6 @@ func TestMain(m *testing.M) { // Ensure that received packets are handled properly in `(*Router).Serve()`. func TestRouter_Serve(t *testing.T) { - // We are generating two key pairs - one for the a `Router`, the other to send packets to `Router`. keys := snettest.GenKeyPairs(2) @@ -49,10 +49,10 @@ func TestRouter_Serve(t *testing.T) { // Create routers r0, err := New(nEnv.Nets[0], rEnv.GenRouterConfig(0)) require.NoError(t, err) - //go r0.Serve(context.TODO()) + // go r0.Serve(context.TODO()) r1, err := New(nEnv.Nets[1], rEnv.GenRouterConfig(1)) require.NoError(t, err) - //go r1.Serve(context.TODO()) + // go r1.Serve(context.TODO()) // Create dmsg transport between two `snet.Network` entities. tp1, err := rEnv.TpMngrs[1].SaveTransport(context.TODO(), keys[0].PK, dmsg.Type) @@ -95,6 +95,11 @@ func TestRouter_Serve(t *testing.T) { // TODO(evanlinjin): I'm having so much trouble with this I officially give up. //t.Run("handlePacket_appRule", func(t *testing.T) { + // const duration = 10 * time.Second + // // time.AfterFunc(duration, func() { + // // panic("timeout") + // // }) + // // defer clearRules(r0, r1) // // // prepare mock-app @@ -109,34 +114,37 @@ func TestRouter_Serve(t *testing.T) { // } // // // serve mock-app - // sErrCh := make(chan error, 1) + // // sErrCh := make(chan error, 1) // go func() { - // sErrCh <- r0.ServeApp(sConn, localPort, appConf) - // close(sErrCh) - // }() - // defer func() { - // assert.NoError(t, cConn.Close()) - // assert.NoError(t, <-sErrCh) + // // sErrCh <- r0.ServeApp(sConn, localPort, appConf) + // _ = r0.ServeApp(sConn, localPort, appConf) + // // close(sErrCh) // }() + // // defer func() { + // // assert.NoError(t, cConn.Close()) + // // assert.NoError(t, <-sErrCh) + // // }() // // a, err := app.New(cConn, appConf) // require.NoError(t, err) - // cErrCh := make(chan error, 1) + // // cErrCh := make(chan error, 1) // go func() { // conn, err := a.Accept() // if err == nil { // fmt.Println("ACCEPTED:", conn.RemoteAddr()) // } // fmt.Println("FAILED TO ACCEPT") - // cErrCh <- err - // close(cErrCh) - // }() - // defer func() { - // assert.NoError(t, <-cErrCh) + // // cErrCh <- err + // // close(cErrCh) // }() + // a.Dial(a.Addr().(routing.Addr)) + // // defer func() { + // // assert.NoError(t, <-cErrCh) + // // }() // // // Add a APP rule for r0. - // appRule := routing.AppRule(time.Now().Add(time.Hour), routing.RouteID(7), keys[1].PK, routing.Port(8), localPort) + // // port8 := routing.Port(8) + // appRule := routing.AppRule(10*time.Minute, 0, routing.RouteID(7), keys[1].PK, localPort, localPort) // appRtID, err := r0.rm.rt.AddRule(appRule) // require.NoError(t, err) // @@ -146,7 +154,7 @@ func TestRouter_Serve(t *testing.T) { // // payload[0] = frame type, payload[1] = id // rAddr := routing.Addr{PubKey: keys[1].PK, Port: localPort} // rawRAddr, _ := json.Marshal(rAddr) - // //payload := append([]byte{byte(app.FrameClose), 0}, rawRAddr...) + // // payload := append([]byte{byte(app.FrameClose), 0}, rawRAddr...) // packet := routing.MakePacket(appRtID, rawRAddr) // require.NoError(t, r0.handlePacket(context.TODO(), packet)) //}) @@ -204,7 +212,7 @@ func (e *TestEnv) GenRouterConfig(i int) *Config { SecKey: e.TpMngrConfs[i].SecKey, TransportManager: e.TpMngrs[i], RoutingTable: routing.InMemoryRoutingTable(), - RouteFinder: nil, // TODO + RouteFinder: routeFinder.NewMock(), SetupNodes: nil, // TODO GarbageCollectDuration: DefaultGarbageCollectDuration, } diff --git a/pkg/routing/rule.go b/pkg/routing/rule.go index fad1e9ece9..ba23bbc4cc 100644 --- a/pkg/routing/rule.go +++ b/pkg/routing/rule.go @@ -149,7 +149,7 @@ type RuleSummary struct { func (rs *RuleSummary) ToRule() (Rule, error) { if rs.Type == RuleApp && rs.AppFields != nil && rs.ForwardFields == nil { f := rs.AppFields - return AppRule(rs.KeepAlive, f.RespRID, f.RemotePK, f.RemotePort, f.LocalPort, rs.RequestRouteID), nil + return AppRule(rs.KeepAlive, rs.RequestRouteID, f.RespRID, f.RemotePK, f.LocalPort, f.RemotePort), nil } if rs.Type == RuleForward && rs.AppFields == nil && rs.ForwardFields != nil { f := rs.ForwardFields @@ -182,8 +182,7 @@ func (r Rule) Summary() *RuleSummary { } // AppRule constructs a new consume RoutingRule. -func AppRule(keepAlive time.Duration, respRoute RouteID, remotePK cipher.PubKey, remotePort, localPort Port, - requestRouteID RouteID) Rule { +func AppRule(keepAlive time.Duration, reqRoute, respRoute RouteID, remotePK cipher.PubKey, localPort, remotePort Port) Rule { rule := make([]byte, RuleHeaderSize) if keepAlive < 0 { @@ -198,7 +197,7 @@ func AppRule(keepAlive time.Duration, respRoute RouteID, remotePK cipher.PubKey, rule = append(rule, bytes.Repeat([]byte{0}, 8)...) binary.BigEndian.PutUint16(rule[46:], uint16(remotePort)) binary.BigEndian.PutUint16(rule[48:], uint16(localPort)) - binary.BigEndian.PutUint32(rule[50:], uint32(requestRouteID)) + binary.BigEndian.PutUint32(rule[50:], uint32(reqRoute)) return rule } diff --git a/pkg/routing/rule_test.go b/pkg/routing/rule_test.go index c4aedb5dfe..93ec1b6ef1 100644 --- a/pkg/routing/rule_test.go +++ b/pkg/routing/rule_test.go @@ -12,7 +12,7 @@ import ( func TestAppRule(t *testing.T) { keepAlive := 2 * time.Minute pk, _ := cipher.GenerateKeyPair() - rule := AppRule(keepAlive, 2, pk, 3, 4, 1) + rule := AppRule(keepAlive, 1, 2, pk, 4, 3) assert.Equal(t, keepAlive, rule.KeepAlive()) assert.Equal(t, RuleApp, rule.Type()) diff --git a/pkg/setup/node.go b/pkg/setup/node.go index 6872e6fb4a..5577b2b2af 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -268,7 +268,7 @@ func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route nextTpID = r[i+1].Transport rule = routing.ForwardRule(keepAlive, 0, nextTpID, 0) } else { - rule = routing.AppRule(keepAlive, 0, init, lport, rport, 0) + rule = routing.AppRule(keepAlive, 0, 0, init, lport, rport) } go func(i int, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID, diff --git a/pkg/setup/node_test.go b/pkg/setup/node_test.go index 4a44dac6a6..364048dfdb 100644 --- a/pkg/setup/node_test.go +++ b/pkg/setup/node_test.go @@ -38,323 +38,324 @@ func TestMain(m *testing.M) { // 3. Hanging may be not the problem of the DMSG. Probably some of the communication part here is wrong. // The reason I think so is that - if we ensure read timeouts, why doesn't this test constantly fail? // Maybe some wrapper for DMSG is wrong, or some internal operations before the actual communication behave bad -/*func TestNode(t *testing.T) { - // Prepare mock dmsg discovery. - discovery := disc.NewMock() - - // Prepare dmsg server. - server, serverErr := createServer(t, discovery) - defer func() { - require.NoError(t, server.Close()) - require.NoError(t, errWithTimeout(serverErr)) - }() - - type clientWithDMSGAddrAndListener struct { - *dmsg.Client - Addr dmsg.Addr - Listener *dmsg.Listener - } - - // CLOSURE: sets up dmsg clients. - prepClients := func(n int) ([]clientWithDMSGAddrAndListener, func()) { - clients := make([]clientWithDMSGAddrAndListener, n) - for i := 0; i < n; i++ { - var port uint16 - // setup node - if i == 0 { - port = snet.SetupPort - } else { - port = snet.AwaitSetupPort - } - pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte{byte(i)}) - require.NoError(t, err) - t.Logf("client[%d] PK: %s\n", i, pk) - c := dmsg.NewClient(pk, sk, discovery, dmsg.SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s:%d", i, pk, port)))) - require.NoError(t, c.InitiateServerConnections(context.TODO(), 1)) - listener, err := c.Listen(port) - require.NoError(t, err) - clients[i] = clientWithDMSGAddrAndListener{ - Client: c, - Addr: dmsg.Addr{ - PK: pk, - Port: port, - }, - Listener: listener, - } - } - return clients, func() { - for _, c := range clients { - //require.NoError(t, c.Listener.Close()) - require.NoError(t, c.Close()) - } - } - } - - // CLOSURE: sets up setup node. - prepSetupNode := func(c *dmsg.Client, listener *dmsg.Listener) (*Node, func()) { - sn := &Node{ - Logger: logging.MustGetLogger("setup_node"), - dmsgC: c, - dmsgL: listener, - metrics: metrics.NewDummy(), - } - go func() { - if err := sn.Serve(context.TODO()); err != nil { - sn.Logger.WithError(err).Error("Failed to serve") - } - }() - return sn, func() { - require.NoError(t, sn.Close()) - } - } - - // TEST: Emulates the communication between 4 visor nodes and a setup node, - // where the first client node initiates a loop to the last. - t.Run("CreateLoop", func(t *testing.T) { - // client index 0 is for setup node. - // clients index 1 to 4 are for visor nodes. - clients, closeClients := prepClients(5) - defer closeClients() - - // prepare and serve setup node (using client 0). - _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) - setupPK := clients[0].Addr.PK - setupPort := clients[0].Addr.Port - defer closeSetup() - - // prepare loop creation (client_1 will use this to request loop creation with setup node). - ld := routing.LoopDescriptor{ - Loop: routing.Loop{ - Local: routing.Addr{PubKey: clients[1].Addr.PK, Port: 1}, - Remote: routing.Addr{PubKey: clients[4].Addr.PK, Port: 1}, - }, - Reverse: routing.Route{ - &routing.Hop{From: clients[1].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, - &routing.Hop{From: clients[2].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, - &routing.Hop{From: clients[3].Addr.PK, To: clients[4].Addr.PK, Transport: uuid.New()}, - }, - Forward: routing.Route{ - &routing.Hop{From: clients[4].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, - &routing.Hop{From: clients[3].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, - &routing.Hop{From: clients[2].Addr.PK, To: clients[1].Addr.PK, Transport: uuid.New()}, - }, - KeepAlive: 1 * time.Hour, - } - - // client_1 initiates loop creation with setup node. - iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) - require.NoError(t, err) - iTpErrs := make(chan error, 2) - go func() { - iTpErrs <- CreateLoop(context.TODO(), NewSetupProtocol(iTp), ld) - iTpErrs <- iTp.Close() - close(iTpErrs) - }() - defer func() { - i := 0 - for err := range iTpErrs { - require.NoError(t, err, i) - i++ - } - }() - - var addRuleDone sync.WaitGroup - var nextRouteID uint32 - // CLOSURE: emulates how a visor node should react when expecting an AddRules packet. - expectAddRules := func(client int, expRule routing.RuleType) { - conn, err := clients[client].Listener.Accept() - require.NoError(t, err) - - fmt.Printf("client %v:%v accepted\n", client, clients[client].Addr) - - proto := NewSetupProtocol(conn) - - pt, _, err := proto.ReadPacket() - require.NoError(t, err) - require.Equal(t, PacketRequestRouteID, pt) - - fmt.Printf("client %v:%v got PacketRequestRouteID\n", client, clients[client].Addr) - - routeID := atomic.AddUint32(&nextRouteID, 1) - - err = proto.WritePacket(RespSuccess, []routing.RouteID{routing.RouteID(routeID)}) - require.NoError(t, err) - - fmt.Printf("client %v:%v responded to with registration ID: %v\n", client, clients[client].Addr, routeID) - - require.NoError(t, conn.Close()) - - conn, err = clients[client].Listener.Accept() - require.NoError(t, err) - - fmt.Printf("client %v:%v accepted 2nd time\n", client, clients[client].Addr) - - proto = NewSetupProtocol(conn) - - pt, pp, err := proto.ReadPacket() - require.NoError(t, err) - require.Equal(t, PacketAddRules, pt) - - fmt.Printf("client %v:%v got PacketAddRules\n", client, clients[client].Addr) - - var rs []routing.Rule - require.NoError(t, json.Unmarshal(pp, &rs)) - - for _, r := range rs { - require.Equal(t, expRule, r.Type()) - } - - // TODO: This error is not checked due to a bug in dmsg. - err = proto.WritePacket(RespSuccess, nil) - _ = err - - fmt.Printf("client %v:%v responded for PacketAddRules\n", client, clients[client].Addr) - - require.NoError(t, conn.Close()) - - addRuleDone.Done() - } - - // CLOSURE: emulates how a visor node should react when expecting an OnConfirmLoop packet. - expectConfirmLoop := func(client int) { - tp, err := clients[client].Listener.AcceptTransport() - require.NoError(t, err) - - proto := NewSetupProtocol(tp) - - pt, pp, err := proto.ReadPacket() - require.NoError(t, err) - require.Equal(t, PacketConfirmLoop, pt) - - var d routing.LoopData - require.NoError(t, json.Unmarshal(pp, &d)) - - switch client { - case 1: - require.Equal(t, ld.Loop, d.Loop) - case 4: - require.Equal(t, ld.Loop.Local, d.Loop.Remote) - require.Equal(t, ld.Loop.Remote, d.Loop.Local) - default: - t.Fatalf("We shouldn't be receiving a OnConfirmLoop packet from client %d", client) - } - - // TODO: This error is not checked due to a bug in dmsg. - err = proto.WritePacket(RespSuccess, nil) - _ = err - - require.NoError(t, tp.Close()) - } - - // since the route establishment is asynchronous, - // we must expect all the messages in parallel - addRuleDone.Add(4) - go expectAddRules(4, routing.RuleApp) - go expectAddRules(3, routing.RuleForward) - go expectAddRules(2, routing.RuleForward) - go expectAddRules(1, routing.RuleForward) - addRuleDone.Wait() - fmt.Println("FORWARD ROUTE DONE") - addRuleDone.Add(4) - go expectAddRules(1, routing.RuleApp) - go expectAddRules(2, routing.RuleForward) - go expectAddRules(3, routing.RuleForward) - go expectAddRules(4, routing.RuleForward) - addRuleDone.Wait() - fmt.Println("REVERSE ROUTE DONE") - expectConfirmLoop(1) - expectConfirmLoop(4) - }) - - // TEST: Emulates the communication between 2 visor nodes and a setup nodes, - // where a route is already established, - // and the first client attempts to tear it down. - t.Run("CloseLoop", func(t *testing.T) { - // client index 0 is for setup node. - // clients index 1 and 2 are for visor nodes. - clients, closeClients := prepClients(3) - defer closeClients() - - // prepare and serve setup node. - _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) - setupPK := clients[0].Addr.PK - setupPort := clients[0].Addr.Port - defer closeSetup() - - // prepare loop data describing the loop that is to be closed. - ld := routing.LoopData{ - Loop: routing.Loop{ - Local: routing.Addr{ - PubKey: clients[1].Addr.PK, - Port: 1, - }, - Remote: routing.Addr{ - PubKey: clients[2].Addr.PK, - Port: 2, - }, - }, - RouteID: 3, - } - - // client_1 initiates close loop with setup node. - iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) - require.NoError(t, err) - iTpErrs := make(chan error, 2) - go func() { - iTpErrs <- CloseLoop(context.TODO(), NewSetupProtocol(iTp), ld) - iTpErrs <- iTp.Close() - close(iTpErrs) - }() - defer func() { - i := 0 - for err := range iTpErrs { - require.NoError(t, err, i) - i++ - } - }() - - // client_2 accepts close request. - tp, err := clients[2].Listener.AcceptTransport() - require.NoError(t, err) - defer func() { require.NoError(t, tp.Close()) }() - - proto := NewSetupProtocol(tp) - - pt, pp, err := proto.ReadPacket() - require.NoError(t, err) - require.Equal(t, PacketLoopClosed, pt) - - var d routing.LoopData - require.NoError(t, json.Unmarshal(pp, &d)) - require.Equal(t, ld.Loop.Remote, d.Loop.Local) - require.Equal(t, ld.Loop.Local, d.Loop.Remote) - - // TODO: This error is not checked due to a bug in dmsg. - err = proto.WritePacket(RespSuccess, nil) - _ = err - }) -}*/ - -// func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) { -// pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte("s")) -// require.NoError(t, err) -// l, err := nettest.NewLocalListener("tcp") -// require.NoError(t, err) -// srv, err = dmsg.NewServer(pk, sk, "", l, dc) -// require.NoError(t, err) -// errCh := make(chan error, 1) -// go func() { -// errCh <- srv.Serve() -// close(errCh) -// }() -// return srv, errCh -// } -// -// func errWithTimeout(ch <-chan error) error { -// select { -// case err := <-ch: -// return err -// case <-time.After(5 * time.Second): -// return errors.New("timeout") -// } -// } +//func TestNode(t *testing.T) { +// // Prepare mock dmsg discovery. +// discovery := disc.NewMock() +// +// // Prepare dmsg server. +// server, serverErr := createServer(t, discovery) +// defer func() { +// require.NoError(t, server.Close()) +// require.NoError(t, errWithTimeout(serverErr)) +// }() +// +// type clientWithDMSGAddrAndListener struct { +// *dmsg.Client +// Addr dmsg.Addr +// Listener *dmsg.Listener +// } +// +// // CLOSURE: sets up dmsg clients. +// prepClients := func(n int) ([]clientWithDMSGAddrAndListener, func()) { +// clients := make([]clientWithDMSGAddrAndListener, n) +// for i := 0; i < n; i++ { +// var port uint16 +// // setup node +// if i == 0 { +// port = snet.SetupPort +// } else { +// port = snet.AwaitSetupPort +// } +// pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte{byte(i)}) +// require.NoError(t, err) +// t.Logf("client[%d] PK: %s\n", i, pk) +// c := dmsg.NewClient(pk, sk, discovery, dmsg.SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s:%d", i, pk, port)))) +// require.NoError(t, c.InitiateServerConnections(context.TODO(), 1)) +// listener, err := c.Listen(port) +// require.NoError(t, err) +// clients[i] = clientWithDMSGAddrAndListener{ +// Client: c, +// Addr: dmsg.Addr{ +// PK: pk, +// Port: port, +// }, +// Listener: listener, +// } +// } +// return clients, func() { +// for _, c := range clients { +// //require.NoError(t, c.Listener.Close()) +// require.NoError(t, c.Close()) +// } +// } +// } +// +// // CLOSURE: sets up setup node. +// prepSetupNode := func(c *dmsg.Client, listener *dmsg.Listener) (*Node, func()) { +// sn := &Node{ +// Logger: logging.MustGetLogger("setup_node"), +// dmsgC: c, +// dmsgL: listener, +// metrics: metrics.NewDummy(), +// } +// go func() { +// if err := sn.Serve(context.TODO()); err != nil { +// sn.Logger.WithError(err).Error("Failed to serve") +// } +// }() +// return sn, func() { +// require.NoError(t, sn.Close()) +// } +// } +// +// // TEST: Emulates the communication between 4 visor nodes and a setup node, +// // where the first client node initiates a loop to the last. +// t.Run("CreateLoop", func(t *testing.T) { +// // client index 0 is for setup node. +// // clients index 1 to 4 are for visor nodes. +// clients, closeClients := prepClients(5) +// defer closeClients() +// +// // prepare and serve setup node (using client 0). +// _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) +// setupPK := clients[0].Addr.PK +// setupPort := clients[0].Addr.Port +// defer closeSetup() +// +// // prepare loop creation (client_1 will use this to request loop creation with setup node). +// ld := routing.LoopDescriptor{ +// Loop: routing.Loop{ +// Local: routing.Addr{PubKey: clients[1].Addr.PK, Port: 1}, +// Remote: routing.Addr{PubKey: clients[4].Addr.PK, Port: 1}, +// }, +// Reverse: routing.Route{ +// &routing.Hop{From: clients[1].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, +// &routing.Hop{From: clients[2].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, +// &routing.Hop{From: clients[3].Addr.PK, To: clients[4].Addr.PK, Transport: uuid.New()}, +// }, +// Forward: routing.Route{ +// &routing.Hop{From: clients[4].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, +// &routing.Hop{From: clients[3].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, +// &routing.Hop{From: clients[2].Addr.PK, To: clients[1].Addr.PK, Transport: uuid.New()}, +// }, +// KeepAlive: 1 * time.Hour, +// } +// +// // client_1 initiates loop creation with setup node. +// iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) +// require.NoError(t, err) +// iTpErrs := make(chan error, 2) +// go func() { +// iTpErrs <- CreateLoop(context.TODO(), NewSetupProtocol(iTp), ld) +// iTpErrs <- iTp.Close() +// close(iTpErrs) +// }() +// defer func() { +// i := 0 +// for err := range iTpErrs { +// require.NoError(t, err, i) +// i++ +// } +// }() +// +// var addRuleDone sync.WaitGroup +// var nextRouteID uint32 +// // CLOSURE: emulates how a visor node should react when expecting an AddRules packet. +// expectAddRules := func(client int, expRule routing.RuleType) { +// conn, err := clients[client].Listener.Accept() +// require.NoError(t, err) +// +// fmt.Printf("client %v:%v accepted\n", client, clients[client].Addr) +// +// proto := NewSetupProtocol(conn) +// +// pt, _, err := proto.ReadPacket() +// require.NoError(t, err) +// require.Equal(t, PacketRequestRouteID, pt) +// +// fmt.Printf("client %v:%v got PacketRequestRouteID\n", client, clients[client].Addr) +// +// routeID := atomic.AddUint32(&nextRouteID, 1) +// +// // TODO: This error is not checked due to a bug in dmsg. +// _ = proto.WritePacket(RespSuccess, []routing.RouteID{routing.RouteID(routeID)}) // nolint:errcheck +// require.NoError(t, err) +// +// fmt.Printf("client %v:%v responded to with registration ID: %v\n", client, clients[client].Addr, routeID) +// +// require.NoError(t, conn.Close()) +// +// conn, err = clients[client].Listener.Accept() +// require.NoError(t, err) +// +// fmt.Printf("client %v:%v accepted 2nd time\n", client, clients[client].Addr) +// +// proto = NewSetupProtocol(conn) +// +// pt, pp, err := proto.ReadPacket() +// require.NoError(t, err) +// require.Equal(t, PacketAddRules, pt) +// +// fmt.Printf("client %v:%v got PacketAddRules\n", client, clients[client].Addr) +// +// var rs []routing.Rule +// require.NoError(t, json.Unmarshal(pp, &rs)) +// +// for _, r := range rs { +// require.Equal(t, expRule, r.Type()) +// } +// +// // TODO: This error is not checked due to a bug in dmsg. +// err = proto.WritePacket(RespSuccess, nil) +// _ = err +// +// fmt.Printf("client %v:%v responded for PacketAddRules\n", client, clients[client].Addr) +// +// require.NoError(t, conn.Close()) +// +// addRuleDone.Done() +// } +// +// // CLOSURE: emulates how a visor node should react when expecting an OnConfirmLoop packet. +// expectConfirmLoop := func(client int) { +// tp, err := clients[client].Listener.AcceptTransport() +// require.NoError(t, err) +// +// proto := NewSetupProtocol(tp) +// +// pt, pp, err := proto.ReadPacket() +// require.NoError(t, err) +// require.Equal(t, PacketConfirmLoop, pt) +// +// var d routing.LoopData +// require.NoError(t, json.Unmarshal(pp, &d)) +// +// switch client { +// case 1: +// require.Equal(t, ld.Loop, d.Loop) +// case 4: +// require.Equal(t, ld.Loop.Local, d.Loop.Remote) +// require.Equal(t, ld.Loop.Remote, d.Loop.Local) +// default: +// t.Fatalf("We shouldn't be receiving a OnConfirmLoop packet from client %d", client) +// } +// +// // TODO: This error is not checked due to a bug in dmsg. +// err = proto.WritePacket(RespSuccess, nil) +// _ = err +// +// require.NoError(t, tp.Close()) +// } +// +// // since the route establishment is asynchronous, +// // we must expect all the messages in parallel +// addRuleDone.Add(4) +// go expectAddRules(4, routing.RuleApp) +// go expectAddRules(3, routing.RuleForward) +// go expectAddRules(2, routing.RuleForward) +// go expectAddRules(1, routing.RuleForward) +// addRuleDone.Wait() +// fmt.Println("FORWARD ROUTE DONE") +// addRuleDone.Add(4) +// go expectAddRules(1, routing.RuleApp) +// go expectAddRules(2, routing.RuleForward) +// go expectAddRules(3, routing.RuleForward) +// go expectAddRules(4, routing.RuleForward) +// addRuleDone.Wait() +// fmt.Println("REVERSE ROUTE DONE") +// expectConfirmLoop(1) +// expectConfirmLoop(4) +// }) +// +// // TEST: Emulates the communication between 2 visor nodes and a setup nodes, +// // where a route is already established, +// // and the first client attempts to tear it down. +// t.Run("CloseLoop", func(t *testing.T) { +// // client index 0 is for setup node. +// // clients index 1 and 2 are for visor nodes. +// clients, closeClients := prepClients(3) +// defer closeClients() +// +// // prepare and serve setup node. +// _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) +// setupPK := clients[0].Addr.PK +// setupPort := clients[0].Addr.Port +// defer closeSetup() +// +// // prepare loop data describing the loop that is to be closed. +// ld := routing.LoopData{ +// Loop: routing.Loop{ +// Local: routing.Addr{ +// PubKey: clients[1].Addr.PK, +// Port: 1, +// }, +// Remote: routing.Addr{ +// PubKey: clients[2].Addr.PK, +// Port: 2, +// }, +// }, +// RouteID: 3, +// } +// +// // client_1 initiates close loop with setup node. +// iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) +// require.NoError(t, err) +// iTpErrs := make(chan error, 2) +// go func() { +// iTpErrs <- CloseLoop(context.TODO(), NewSetupProtocol(iTp), ld) +// iTpErrs <- iTp.Close() +// close(iTpErrs) +// }() +// defer func() { +// i := 0 +// for err := range iTpErrs { +// require.NoError(t, err, i) +// i++ +// } +// }() +// +// // client_2 accepts close request. +// tp, err := clients[2].Listener.AcceptTransport() +// require.NoError(t, err) +// defer func() { require.NoError(t, tp.Close()) }() +// +// proto := NewSetupProtocol(tp) +// +// pt, pp, err := proto.ReadPacket() +// require.NoError(t, err) +// require.Equal(t, PacketLoopClosed, pt) +// +// var d routing.LoopData +// require.NoError(t, json.Unmarshal(pp, &d)) +// require.Equal(t, ld.Loop.Remote, d.Loop.Local) +// require.Equal(t, ld.Loop.Local, d.Loop.Remote) +// +// // TODO: This error is not checked due to a bug in dmsg. +// err = proto.WritePacket(RespSuccess, nil) +// _ = err +// }) +//} +// +//func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) { +// pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte("s")) +// require.NoError(t, err) +// l, err := nettest.NewLocalListener("tcp") +// require.NoError(t, err) +// srv, err = dmsg.NewServer(pk, sk, "", l, dc) +// require.NoError(t, err) +// errCh := make(chan error, 1) +// go func() { +// errCh <- srv.Serve() +// close(errCh) +// }() +// return srv, errCh +//} +// +//func errWithTimeout(ch <-chan error) error { +// select { +// case err := <-ch: +// return err +// case <-time.After(5 * time.Second): +// return errors.New("timeout") +// } +//} diff --git a/pkg/visor/config.go b/pkg/visor/config.go index fc08536eb2..36451dd6d0 100644 --- a/pkg/visor/config.go +++ b/pkg/visor/config.go @@ -73,7 +73,6 @@ type Config struct { // MessagingConfig returns config for dmsg client. func (c *Config) MessagingConfig() (*DmsgConfig, error) { - msgConfig := c.Messaging if msgConfig.Discovery == "" { diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 8fda4a4f27..33a01cc14d 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -232,7 +232,7 @@ func NewMockRPCClient(r *rand.Rand, maxTps int, maxRules int) (cipher.PubKey, RP if err != nil { panic(err) } - appRule := routing.AppRule(ruleKeepAlive, fwdRID, remotePK, rp, lp, appRID) + appRule := routing.AppRule(ruleKeepAlive, appRID, fwdRID, remotePK, lp, rp) if err := rt.SetRule(appRID, appRule); err != nil { panic(err) } diff --git a/pkg/visor/rpc_test.go b/pkg/visor/rpc_test.go index 9f68f65596..7778105edf 100644 --- a/pkg/visor/rpc_test.go +++ b/pkg/visor/rpc_test.go @@ -90,200 +90,205 @@ func TestStartStopApp(t *testing.T) { node.startedMu.Unlock() } -// TODO(nkryuchkov): fix and uncomment /* -func TestRPC(t *testing.T) { - r := new(mockRouter) - executer := new(MockExecuter) - defer func() { - require.NoError(t, os.RemoveAll("skychat")) - }() - - pk1, _, tm1, tm2, errCh, err := transport.MockTransportManagersPair() - - require.NoError(t, err) - defer func() { - require.NoError(t, tm1.Close()) - require.NoError(t, tm2.Close()) - require.NoError(t, <-errCh) - require.NoError(t, <-errCh) - }() - - _, err = tm2.SaveTransport(context.TODO(), pk1, snet.DmsgType) - require.NoError(t, err) - - apps := []AppConfig{ - {App: "foo", Version: "1.0", AutoStart: false, Port: 10}, - {App: "bar", Version: "2.0", AutoStart: false, Port: 20}, - } - conf := &Config{} - conf.Node.StaticPubKey = pk1 - node := &Node{ - config: conf, - router: r, - tm: tm1, - rt: routing.InMemoryRoutingTable(), - executer: executer, - appsConf: apps, - startedApps: map[string]*appBind{}, - logger: logging.MustGetLogger("test"), - } - pathutil.EnsureDir(node.dir()) - defer func() { - if err := os.RemoveAll(node.dir()); err != nil { - log.WithError(err).Warn(err) - } - }() - - require.NoError(t, node.StartApp("foo")) - - time.Sleep(time.Second) - gateway := &RPC{node: node} - - sConn, cConn := net.Pipe() - defer func() { - require.NoError(t, sConn.Close()) - require.NoError(t, cConn.Close()) - }() - - svr := rpc.NewServer() - require.NoError(t, svr.RegisterName(RPCPrefix, gateway)) - go svr.ServeConn(sConn) - - // client := RPCClient{Client: rpc.NewClient(cConn)} - - printFunc := func(t *testing.T, name string, v interface{}) { - j, err := json.MarshalIndent(v, name+": ", " ") - require.NoError(t, err) - t.Log(string(j)) - } - - t.Run("Summary", func(t *testing.T) { - test := func(t *testing.T, summary *Summary) { - assert.Equal(t, pk1, summary.PubKey) - assert.Len(t, summary.Apps, 2) - assert.Len(t, summary.Transports, 1) - printFunc(t, "Summary", summary) - } - t.Run("RPCServer", func(t *testing.T) { - var summary Summary - require.NoError(t, gateway.Summary(&struct{}{}, &summary)) - test(t, &summary) - }) - // t.Run("RPCClient", func(t *testing.T) { - // summary, err := client.Summary() - // require.NoError(t, err) - // test(t, summary) - // }) - }) - - t.Run("Exec", func(t *testing.T) { - command := "echo 1" - - t.Run("RPCServer", func(t *testing.T) { - var out []byte - require.NoError(t, gateway.Exec(&command, &out)) - assert.Equal(t, []byte("1\n"), out) - }) - - // t.Run("RPCClient", func(t *testing.T) { - // out, err := client.Exec(command) - // require.NoError(t, err) - // assert.Equal(t, []byte("1\n"), out) - // }) - }) - - t.Run("Apps", func(t *testing.T) { - test := func(t *testing.T, apps []*AppState) { - assert.Len(t, apps, 2) - printFunc(t, "Apps", apps) - } - t.Run("RPCServer", func(t *testing.T) { - var apps []*AppState - require.NoError(t, gateway.Apps(&struct{}{}, &apps)) - test(t, apps) - }) - // t.Run("RPCClient", func(t *testing.T) { - // apps, err := client.Apps() - // require.NoError(t, err) - // test(t, apps) - // }) - }) - - // TODO(evanlinjin): For some reason, this freezes. - // t.Run("StopStartApp", func(t *testing.T) { - // appName := "foo" - // require.NoError(t, gateway.StopApp(&appName, &struct{}{})) - // require.NoError(t, gateway.StartApp(&appName, &struct{}{})) - // require.NoError(t, client.StopApp(appName)) - // require.NoError(t, client.StartApp(appName)) - // }) - - t.Run("SetAutoStart", func(t *testing.T) { - unknownAppName := "whoAmI" - appName := "foo" - - in1 := SetAutoStartIn{AppName: unknownAppName, AutoStart: true} - in2 := SetAutoStartIn{AppName: appName, AutoStart: true} - in3 := SetAutoStartIn{AppName: appName, AutoStart: false} - - // Test with RPC Server - - err := gateway.SetAutoStart(&in1, &struct{}{}) - require.Error(t, err) - assert.Equal(t, ErrUnknownApp, err) - - require.NoError(t, gateway.SetAutoStart(&in2, &struct{}{})) - assert.True(t, node.appsConf[0].AutoStart) - - require.NoError(t, gateway.SetAutoStart(&in3, &struct{}{})) - assert.False(t, node.appsConf[0].AutoStart) - - // Test with RPC Client - - // err = client.SetAutoStart(in1.AppName, in1.AutoStart) - // require.Error(t, err) - // assert.Equal(t, ErrUnknownApp.Error(), err.Error()) - // - // require.NoError(t, client.SetAutoStart(in2.AppName, in2.AutoStart)) - // assert.True(t, node.appsConf[0].AutoStart) - // - // require.NoError(t, client.SetAutoStart(in3.AppName, in3.AutoStart)) - // assert.False(t, node.appsConf[0].AutoStart) - }) - - t.Run("TransportTypes", func(t *testing.T) { - in := TransportsIn{ShowLogs: true} - - var out []*TransportSummary - require.NoError(t, gateway.Transports(&in, &out)) - assert.Len(t, out, 1) - assert.Equal(t, "mock", out[0].Type) - - // out2, err := client.Transports(in.FilterTypes, in.FilterPubKeys, in.ShowLogs) - // require.NoError(t, err) - // assert.Equal(t, out, out2) - }) - - //t.Run("Transport", func(t *testing.T) { - // var ids []uuid.UUID - // node.tm.WalkTransports(func(tp *transport.ManagedTransport) bool { - // ids = append(ids, tp.Entry.ID) - // return true - // }) - // - // for _, id := range ids { - // id := id - // var summary TransportSummary - // require.NoError(t, gateway.Transport(&id, &summary)) - // - // summary2, err := client.Transport(id) - // require.NoError(t, err) - // require.Equal(t, summary, *summary2) - // } - //}) - - // TODO: Test add/remove transports - -} +TODO(evanlinjin): Fix these tests. +These tests have been commented out for the following reasons: +- We can't seem to get them to work. +- Mock transport causes too much issues so we deleted it. */ + +//func TestRPC(t *testing.T) { +// r := new(mockRouter) +// executer := new(MockExecuter) +// defer func() { +// require.NoError(t, os.RemoveAll("skychat")) +// }() +// +// pk1, _, tm1, tm2, errCh, err := transport.MockTransportManagersPair() +// +// require.NoError(t, err) +// defer func() { +// require.NoError(t, tm1.Close()) +// require.NoError(t, tm2.Close()) +// require.NoError(t, <-errCh) +// require.NoError(t, <-errCh) +// }() +// +// _, err = tm2.SaveTransport(context.TODO(), pk1, snet.DmsgType) +// require.NoError(t, err) +// +// apps := []AppConfig{ +// {App: "foo", Version: "1.0", AutoStart: false, Port: 10}, +// {App: "bar", Version: "2.0", AutoStart: false, Port: 20}, +// } +// conf := &Config{} +// conf.Node.StaticPubKey = pk1 +// node := &Node{ +// config: conf, +// router: r, +// tm: tm1, +// rt: routing.InMemoryRoutingTable(), +// executer: executer, +// appsConf: apps, +// startedApps: map[string]*appBind{}, +// logger: logging.MustGetLogger("test"), +// } +// pathutil.EnsureDir(node.dir()) +// defer func() { +// if err := os.RemoveAll(node.dir()); err != nil { +// log.WithError(err).Warn(err) +// } +// }() +// +// require.NoError(t, node.StartApp("foo")) +// +// time.Sleep(time.Second) +// gateway := &RPC{node: node} +// +// sConn, cConn := net.Pipe() +// defer func() { +// require.NoError(t, sConn.Close()) +// require.NoError(t, cConn.Close()) +// }() +// +// svr := rpc.NewServer() +// require.NoError(t, svr.RegisterName(RPCPrefix, gateway)) +// go svr.ServeConn(sConn) +// +// // client := RPCClient{Client: rpc.NewClient(cConn)} +// client := NewRPCClient(rpc.NewClient(cConn), "") +// +// printFunc := func(t *testing.T, name string, v interface{}) { +// j, err := json.MarshalIndent(v, name+": ", " ") +// require.NoError(t, err) +// t.Log(string(j)) +// } +// +// t.Run("Summary", func(t *testing.T) { +// test := func(t *testing.T, summary *Summary) { +// assert.Equal(t, pk1, summary.PubKey) +// assert.Len(t, summary.Apps, 2) +// assert.Len(t, summary.Transports, 1) +// printFunc(t, "Summary", summary) +// } +// t.Run("RPCServer", func(t *testing.T) { +// var summary Summary +// require.NoError(t, gateway.Summary(&struct{}{}, &summary)) +// test(t, &summary) +// }) +// t.Run("RPCClient", func(t *testing.T) { +// summary, err := client.Summary() +// require.NoError(t, err) +// test(t, summary) +// }) +// }) +// +// t.Run("Exec", func(t *testing.T) { +// command := "echo 1" +// +// t.Run("RPCServer", func(t *testing.T) { +// var out []byte +// require.NoError(t, gateway.Exec(&command, &out)) +// assert.Equal(t, []byte("1\n"), out) +// }) +// +// t.Run("RPCClient", func(t *testing.T) { +// out, err := client.Exec(command) +// require.NoError(t, err) +// assert.Equal(t, []byte("1\n"), out) +// }) +// }) +// +// t.Run("Apps", func(t *testing.T) { +// test := func(t *testing.T, apps []*AppState) { +// assert.Len(t, apps, 2) +// printFunc(t, "Apps", apps) +// } +// t.Run("RPCServer", func(t *testing.T) { +// var apps []*AppState +// require.NoError(t, gateway.Apps(&struct{}{}, &apps)) +// test(t, apps) +// }) +// t.Run("RPCClient", func(t *testing.T) { +// apps, err := client.Apps() +// require.NoError(t, err) +// test(t, apps) +// }) +// }) +// +// // TODO(evanlinjin): For some reason, this freezes. +// t.Run("StopStartApp", func(t *testing.T) { +// appName := "foo" +// require.NoError(t, gateway.StopApp(&appName, &struct{}{})) +// require.NoError(t, gateway.StartApp(&appName, &struct{}{})) +// require.NoError(t, client.StopApp(appName)) +// require.NoError(t, client.StartApp(appName)) +// }) +// +// t.Run("SetAutoStart", func(t *testing.T) { +// unknownAppName := "whoAmI" +// appName := "foo" +// +// in1 := SetAutoStartIn{AppName: unknownAppName, AutoStart: true} +// in2 := SetAutoStartIn{AppName: appName, AutoStart: true} +// in3 := SetAutoStartIn{AppName: appName, AutoStart: false} +// +// // Test with RPC Server +// +// err := gateway.SetAutoStart(&in1, &struct{}{}) +// require.Error(t, err) +// assert.Equal(t, ErrUnknownApp, err) +// +// require.NoError(t, gateway.SetAutoStart(&in2, &struct{}{})) +// assert.True(t, node.appsConf[0].AutoStart) +// +// require.NoError(t, gateway.SetAutoStart(&in3, &struct{}{})) +// assert.False(t, node.appsConf[0].AutoStart) +// +// // Test with RPC Client +// +// err = client.SetAutoStart(in1.AppName, in1.AutoStart) +// require.Error(t, err) +// assert.Equal(t, ErrUnknownApp.Error(), err.Error()) +// +// require.NoError(t, client.SetAutoStart(in2.AppName, in2.AutoStart)) +// assert.True(t, node.appsConf[0].AutoStart) +// +// require.NoError(t, client.SetAutoStart(in3.AppName, in3.AutoStart)) +// assert.False(t, node.appsConf[0].AutoStart) +// }) +// +// t.Run("TransportTypes", func(t *testing.T) { +// in := TransportsIn{ShowLogs: true} +// +// var out []*TransportSummary +// require.NoError(t, gateway.Transports(&in, &out)) +// require.Len(t, out, 1) +// assert.Equal(t, "mock", out[0].Type) +// +// out2, err := client.Transports(in.FilterTypes, in.FilterPubKeys, in.ShowLogs) +// require.NoError(t, err) +// assert.Equal(t, out, out2) +// }) +// +// t.Run("Transport", func(t *testing.T) { +// var ids []uuid.UUID +// node.tm.WalkTransports(func(tp *transport.ManagedTransport) bool { +// ids = append(ids, tp.Entry.ID) +// return true +// }) +// +// for _, id := range ids { +// id := id +// var summary TransportSummary +// require.NoError(t, gateway.Transport(&id, &summary)) +// +// summary2, err := client.Transport(id) +// require.NoError(t, err) +// require.Equal(t, summary, *summary2) +// } +// }) +// +// // TODO: Test add/remove transports +// +//} diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index 79da9dd209..141ae42f7d 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -23,7 +23,6 @@ import ( "github.com/skycoin/dmsg" "github.com/skycoin/dmsg/cipher" - "github.com/skycoin/dmsg/noise" "github.com/skycoin/skycoin/src/util/logging" diff --git a/pkg/visor/visor_test.go b/pkg/visor/visor_test.go index 6f194cd035..39447cfae2 100644 --- a/pkg/visor/visor_test.go +++ b/pkg/visor/visor_test.go @@ -11,13 +11,16 @@ import ( "testing" "time" + "github.com/skycoin/dmsg" "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/dmsg/disc" "github.com/skycoin/skycoin/src/util/logging" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/skycoin/skywire/pkg/app" "github.com/skycoin/skywire/pkg/routing" + "github.com/skycoin/skywire/pkg/snet" "github.com/skycoin/skywire/pkg/transport" "github.com/skycoin/skywire/pkg/util/pathutil" ) @@ -41,42 +44,39 @@ func TestMain(m *testing.M) { } // TODO(nkryuchkov): fix and uncomment -/* -func TestNewNode(t *testing.T) { - pk, sk := cipher.GenerateKeyPair() - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - require.NoError(t, json.NewEncoder(w).Encode(&httpauth.NextNonceResponse{Edge: pk, NextNonce: 1})) - })) - defer srv.Close() - - conf := Config{Version: "1.0", LocalPath: "local", AppsPath: "apps"} - conf.Node.StaticPubKey = pk - conf.Node.StaticSecKey = sk - conf.Messaging.Discovery = "http://skywire.skycoin.net:8001" - conf.Messaging.ServerCount = 10 - conf.Transport.Discovery = srv.URL - conf.Apps = []AppConfig{ - {App: "foo", Version: "1.1", Port: 1}, - {App: "bar", AutoStart: true, Port: 2}, - } - - defer func() { - require.NoError(t, os.RemoveAll("local")) - }() - - node, err := NewNode(&conf, masterLogger) - require.NoError(t, err) - - assert.NotNil(t, node.router) - assert.NotNil(t, node.appsConf) - assert.NotNil(t, node.appsPath) - assert.NotNil(t, node.localPath) - assert.NotNil(t, node.startedApps) -} -*/ - -// TODO(Darkren): fix test -/*func TestNodeStartClose(t *testing.T) { +//func TestNewNode(t *testing.T) { +// pk, sk := cipher.GenerateKeyPair() +// srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +// require.NoError(t, json.NewEncoder(w).Encode(&httpauth.NextNonceResponse{Edge: pk, NextNonce: 1})) +// })) +// defer srv.Close() +// +// conf := Config{Version: "1.0", LocalPath: "local", AppsPath: "apps"} +// conf.Node.StaticPubKey = pk +// conf.Node.StaticSecKey = sk +// conf.Messaging.Discovery = "http://skywire.skycoin.net:8001" +// conf.Messaging.ServerCount = 10 +// conf.Transport.Discovery = srv.URL +// conf.Apps = []AppConfig{ +// {App: "foo", Version: "1.1", Port: 1}, +// {App: "bar", AutoStart: true, Port: 2}, +// } +// +// defer func() { +// require.NoError(t, os.RemoveAll("local")) +// }() +// +// node, err := NewNode(&conf, masterLogger) +// require.NoError(t, err) +// +// assert.NotNil(t, node.router) +// assert.NotNil(t, node.appsConf) +// assert.NotNil(t, node.appsPath) +// assert.NotNil(t, node.localPath) +// assert.NotNil(t, node.startedApps) +//} + +func TestNodeStartClose(t *testing.T) { r := new(mockRouter) executer := &MockExecuter{} conf := []AppConfig{ @@ -90,13 +90,21 @@ func TestNewNode(t *testing.T) { node := &Node{config: &Config{}, router: r, executer: executer, appsConf: conf, startedApps: map[string]*appBind{}, logger: logging.MustGetLogger("test")} - mConf := &dmsg.Config{PubKey: cipher.PubKey{}, SecKey: cipher.SecKey{}, Discovery: disc.NewMock()} - node.messenger = dmsg.NewClient(mConf.PubKey, mConf.SecKey, mConf.Discovery) - var err error + dmsgC := dmsg.NewClient(cipher.PubKey{}, cipher.SecKey{}, disc.NewMock()) + netConf := snet.Config{ + PubKey: cipher.PubKey{}, + SecKey: cipher.SecKey{}, + TpNetworks: nil, + DmsgDiscAddr: "", + DmsgMinSrvs: 0, + } + network := snet.NewRaw(netConf, dmsgC, nil) tmConf := &transport.ManagerConfig{PubKey: cipher.PubKey{}, DiscoveryClient: transport.NewDiscoveryMock()} - node.tm, err = transport.NewManager(tmConf, nil, node.messenger) + + tm, err := transport.NewManager(network, tmConf) + node.tm = tm require.NoError(t, err) errCh := make(chan error) @@ -112,7 +120,7 @@ func TestNewNode(t *testing.T) { require.Len(t, executer.cmds, 1) assert.Equal(t, "skychat.v1.0", executer.cmds[0].Path) assert.Equal(t, "skychat/v1.0", executer.cmds[0].Dir) -}*/ +} func TestNodeSpawnApp(t *testing.T) { pk, _ := cipher.GenerateKeyPair()