From 28c5ed2966e772d6493c0b4ab541954e19c5652b Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 4 Oct 2019 18:42:20 +0300 Subject: [PATCH] Minor fixes --- go.mod | 2 +- pkg/app/packet_test.go | 24 - pkg/app2/client_test.go | 2 +- pkg/app2/errors.go | 4 +- pkg/app2/id_manager.go | 3 +- pkg/app2/id_manager_util.go | 3 +- pkg/app2/listener_test.go | 2 +- pkg/app2/rpc_client_test.go | 6 +- pkg/app2/rpc_gateway.go | 2 +- pkg/app2/rpc_gateway_test.go | 4 +- pkg/router/route_group.go | 16 +- pkg/router/router.go | 41 +- pkg/router/router_test.go | 364 ++++++------ pkg/router/{gateway.go => rpc_gateway.go} | 14 +- pkg/router/rpc_gateway_test.go | 2 + pkg/routing/packet_test.go | 29 +- pkg/setup/node.go | 2 +- pkg/setup/node_test.go | 659 +++++++++++----------- pkg/setup/{gateway.go => rpc_gateway.go} | 10 +- pkg/visor/visor_test.go | 9 + 20 files changed, 613 insertions(+), 585 deletions(-) delete mode 100644 pkg/app/packet_test.go rename pkg/router/{gateway.go => rpc_gateway.go} (76%) create mode 100644 pkg/router/rpc_gateway_test.go rename pkg/setup/{gateway.go => rpc_gateway.go} (89%) diff --git a/go.mod b/go.mod index c941ff5438..a11abf0318 100644 --- a/go.mod +++ b/go.mod @@ -27,4 +27,4 @@ require ( ) // Uncomment for tests with alternate branches of 'dmsg' -replace github.com/skycoin/dmsg => ../dmsg +// replace github.com/skycoin/dmsg => ../dmsg diff --git a/pkg/app/packet_test.go b/pkg/app/packet_test.go deleted file mode 100644 index 7990a166b0..0000000000 --- a/pkg/app/packet_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package app - -import ( - "fmt" - - "github.com/skycoin/dmsg/cipher" - - "github.com/skycoin/skywire/pkg/routing" -) - -func ExamplePacket() { - var lpk, rpk cipher.PubKey - laddr := routing.Addr{Port: 0, PubKey: lpk} - raddr := routing.Addr{Port: 0, PubKey: rpk} - loop := routing.Loop{Local: laddr, Remote: raddr} - - fmt.Println(raddr.Network()) - fmt.Printf("%v\n", raddr) - fmt.Printf("%v\n", loop) - - // Output: skywire - // 000000000000000000000000000000000000000000000000000000000000000000:0 - // 000000000000000000000000000000000000000000000000000000000000000000:0 <-> 000000000000000000000000000000000000000000000000000000000000000000:0 -} diff --git a/pkg/app2/client_test.go b/pkg/app2/client_test.go index 56e571cbfd..1dbb1e5135 100644 --- a/pkg/app2/client_test.go +++ b/pkg/app2/client_test.go @@ -1,9 +1,9 @@ package app2 import ( + "errors" "testing" - "github.com/pkg/errors" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" "github.com/stretchr/testify/require" diff --git a/pkg/app2/errors.go b/pkg/app2/errors.go index 88653a613d..39b50ecd1a 100644 --- a/pkg/app2/errors.go +++ b/pkg/app2/errors.go @@ -1,6 +1,8 @@ package app2 -import "github.com/pkg/errors" +import ( + "errors" +) var ( // ErrPortAlreadyBound is being returned when trying to bind to the port diff --git a/pkg/app2/id_manager.go b/pkg/app2/id_manager.go index e6ea530da7..44b36ca636 100644 --- a/pkg/app2/id_manager.go +++ b/pkg/app2/id_manager.go @@ -1,10 +1,9 @@ package app2 import ( + "errors" "fmt" "sync" - - "github.com/pkg/errors" ) var ( diff --git a/pkg/app2/id_manager_util.go b/pkg/app2/id_manager_util.go index 174b293300..ea00fdc4ed 100644 --- a/pkg/app2/id_manager_util.go +++ b/pkg/app2/id_manager_util.go @@ -1,9 +1,8 @@ package app2 import ( + "errors" "net" - - "github.com/pkg/errors" ) // assertListener asserts that `v` is of type `net.Listener`. diff --git a/pkg/app2/listener_test.go b/pkg/app2/listener_test.go index 9ff53cf7b4..831be099ea 100644 --- a/pkg/app2/listener_test.go +++ b/pkg/app2/listener_test.go @@ -1,9 +1,9 @@ package app2 import ( + "errors" "testing" - "github.com/pkg/errors" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" "github.com/stretchr/testify/require" diff --git a/pkg/app2/rpc_client_test.go b/pkg/app2/rpc_client_test.go index 2d3476780e..7ed88850fb 100644 --- a/pkg/app2/rpc_client_test.go +++ b/pkg/app2/rpc_client_test.go @@ -2,11 +2,11 @@ package app2 import ( "context" + "errors" "net" "net/rpc" "testing" - "github.com/pkg/errors" "github.com/skycoin/dmsg" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" @@ -47,7 +47,7 @@ func TestRPCClient_Dial(t *testing.T) { dialCtx := context.Background() dialConn := dmsg.NewTransport(&MockConn{}, logging.MustGetLogger("dmsg_tp"), - dmsgLocal, dmsgRemote, 0, func() {}) + dmsgLocal, dmsgRemote, 0, func(_ uint16) {}) var noErr error n := &network.MockNetworker{} @@ -186,7 +186,7 @@ func TestRPCClient_Accept(t *testing.T) { Port: remotePort, } lisConn := dmsg.NewTransport(&MockConn{}, logging.MustGetLogger("dmsg_tp"), - dmsgLocal, dmsgRemote, 0, func() {}) + dmsgLocal, dmsgRemote, 0, func(_ uint16) {}) var noErr error lis := &MockListener{} diff --git a/pkg/app2/rpc_gateway.go b/pkg/app2/rpc_gateway.go index dd1387131c..b9473d375c 100644 --- a/pkg/app2/rpc_gateway.go +++ b/pkg/app2/rpc_gateway.go @@ -1,10 +1,10 @@ package app2 import ( + "errors" "fmt" "net" - "github.com/pkg/errors" "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/pkg/app2/network" diff --git a/pkg/app2/rpc_gateway_test.go b/pkg/app2/rpc_gateway_test.go index 637100a740..4ad1cb9e58 100644 --- a/pkg/app2/rpc_gateway_test.go +++ b/pkg/app2/rpc_gateway_test.go @@ -2,12 +2,12 @@ package app2 import ( "context" + "errors" "math" "net" "strings" "testing" - "github.com/pkg/errors" "github.com/skycoin/dmsg" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" @@ -29,7 +29,7 @@ func TestRPCGateway_Dial(t *testing.T) { localPort := routing.Port(100) dialCtx := context.Background() - dialConn := dmsg.NewTransport(nil, nil, dmsg.Addr{Port: uint16(localPort)}, dmsg.Addr{}, 0, func() {}) + dialConn := dmsg.NewTransport(nil, nil, dmsg.Addr{Port: uint16(localPort)}, dmsg.Addr{}, 0, func(_ uint16) {}) var dialErr error n := &network.MockNetworker{} diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 1d598afc56..62338d884d 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "io" "net" "sync" "time" @@ -41,6 +42,8 @@ type RouteGroup struct { // and push to the appropriate '(RouteGroup).readCh'. readCh chan []byte // push reads from Router readBuf bytes.Buffer // for read overflow + done chan struct{} + once sync.Once rt routing.Table } @@ -68,7 +71,12 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) { return r.readBuf.Read(p) } - return ioutil.BufRead(&r.readBuf, <-r.readCh, p) + data, ok := <-r.readCh + if !ok { + return 0, io.ErrClosedPipe + } + + return ioutil.BufRead(&r.readBuf, data, p) } // Write writes payload to a RouteGroup @@ -102,7 +110,6 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) { // - Delete all rules (ForwardRules and ConsumeRules) from routing table. // - Close all go channels. func (r *RouteGroup) Close() error { - r.mu.Lock() defer r.mu.Unlock() @@ -124,7 +131,10 @@ func (r *RouteGroup) Close() error { } r.rt.DelRules(routeIDs) - close(r.readCh) // TODO(nkryuchkov): close readCh properly + r.once.Do(func() { + close(r.done) + close(r.readCh) + }) return nil } diff --git a/pkg/router/router.go b/pkg/router/router.go index d6bc1b195f..49f3b5774c 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -133,7 +133,7 @@ func New(n *snet.Network, config *Config) (*router, error) { trustedNodes: trustedNodes, } - if err := r.rpcSrv.Register(NewGateway(r)); err != nil { + if err := r.rpcSrv.Register(NewRPCGateway(r)); err != nil { return nil, fmt.Errorf("failed to register RPC server") } @@ -276,7 +276,7 @@ func (r *router) handleTransportPacket(ctx context.Context, packet routing.Packe } desc := rule.RouteDescriptor() - rg, ok := r.rgs[desc] + rg, ok := r.routeGroup(desc) if !ok { return errors.New("route descriptor does not exist") } @@ -288,10 +288,23 @@ func (r *router) handleTransportPacket(ctx context.Context, packet routing.Packe switch t := rule.Type(); t { case routing.RuleForward, routing.RuleIntermediaryForward: return r.forwardPacket(ctx, packet.Payload(), rule) - default: - rg.readCh <- packet.Payload() - return nil + default: // TODO(nkryuchkov): try to simplify + select { + case <-rg.done: + return io.ErrClosedPipe + default: + rg.mu.Lock() + defer rg.mu.Unlock() + select { + case rg.readCh <- packet.Payload(): + return nil + case <-rg.done: + return io.ErrClosedPipe + } + + } } + } // GetRule gets routing rule. @@ -342,24 +355,6 @@ func (r *router) forwardPacket(ctx context.Context, payload []byte, rule routing return nil } -// func (r *router) consumePacket(payload []byte, rule routing.Rule) error { -// laddr := routing.Addr{Port: rule.RouteDescriptor().SrcPort()} -// raddr := routing.Addr{PubKey: rule.RouteDescriptor().DstPK(), Port: rule.RouteDescriptor().DstPort()} -// -// route := routing.Route{Desc: routing.NewRouteDescriptor(laddr.PubKey, raddr.PubKey, laddr.Port, raddr.Port)} -// p := &app.Packet{Desc: route.Desc, Payload: payload} -// b, err := r.pm.Get(rule.RouteDescriptor().SrcPort()) -// if err != nil { -// return err -// } -// if err := b.conn.Send(app.FrameSend, p, nil); err != nil { // TODO: Stuck here. -// return err -// } -// -// r.logger.Infof("Forwarded packet to App on Port %d", rule.RouteDescriptor().SrcPort()) -// return nil -// } - // RemoveRouteDescriptor removes loop rule. func (r *router) RemoveRouteDescriptor(desc routing.RouteDescriptor) { rules := r.rt.AllRules() diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index 82bca82a72..ebce2393d5 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -3,6 +3,7 @@ package router import ( "context" "fmt" + "net" "os" "testing" "time" @@ -15,6 +16,7 @@ import ( "github.com/skycoin/skywire/pkg/routefinder/rfclient" "github.com/skycoin/skywire/pkg/routing" + "github.com/skycoin/skywire/pkg/setup" "github.com/skycoin/skywire/pkg/snet" "github.com/skycoin/skywire/pkg/snet/snettest" "github.com/skycoin/skywire/pkg/transport" @@ -251,193 +253,193 @@ func TestRouter_Rules(t *testing.T) { }) // TEST: Ensure AddRule and DeleteRule requests from a SetupNode does as expected. - // t.Run("AddRemoveRule", func(t *testing.T) { - // clearRules() - // - // // Add/Remove rules multiple times. - // for i := 0; i < 5; i++ { - // // As setup connections close after a single request completes - // // So we need two pairs of connections. - // requestIDIn, requestIDOut := net.Pipe() - // addIn, addOut := net.Pipe() - // delIn, delOut := net.Pipe() - // errCh := make(chan error, 2) - // go func() { - // errCh <- r.handleSetupConn(requestIDOut) // Receive RequestRegistrationID request. - // errCh <- r.handleSetupConn(addOut) // Receive AddRule request. - // errCh <- r.handleSetupConn(delOut) // Receive DeleteRule request. - // close(errCh) - // }() - // - // // Emulate SetupNode sending RequestRegistrationID request. - // proto := setup.NewSetupProtocol(requestIDIn) - // ids, err := proto.ReserveRtIDs(context.TODO(), 1) - // require.NoError(t, err) - // - // // Emulate SetupNode sending AddRule request. - // rule := routing.IntermediaryForwardRule(10*time.Minute, ids[0], 3, uuid.New()) - // proto = setup.NewSetupProtocol(addIn) - // err = proto.AddRules(context.TODO(), []routing.Rule{rule}) - // require.NoError(t, err) - // - // // Check routing table state after AddRule. - // assert.Equal(t, 1, rt.Count()) - // r, err := rt.Rule(ids[0]) - // require.NoError(t, err) - // assert.Equal(t, rule, r) - // - // // Emulate SetupNode sending RemoveRule request. - // require.NoError(t, setup.DeleteRule(context.TODO(), setup.NewSetupProtocol(delIn), ids[0])) - // - // // Check routing table state after DeleteRule. - // assert.Equal(t, 0, rt.Count()) - // r, err = rt.Rule(ids[0]) - // assert.Error(t, err) - // assert.Nil(t, r) - // - // require.NoError(t, requestIDIn.Close()) - // require.NoError(t, addIn.Close()) - // require.NoError(t, delIn.Close()) - // for err := range errCh { - // require.NoError(t, err) - // } - // } - // }) + t.Run("AddRemoveRule", func(t *testing.T) { + clearRules() + + // Add/Remove rules multiple times. + for i := 0; i < 5; i++ { + // As setup connections close after a single request completes + // So we need two pairs of connections. + requestIDIn, requestIDOut := net.Pipe() + addIn, addOut := net.Pipe() + delIn, delOut := net.Pipe() + errCh := make(chan error, 2) + go func() { + errCh <- r.handleSetupConn(requestIDOut) // Receive RequestRegistrationID request. + errCh <- r.handleSetupConn(addOut) // Receive AddRule request. + errCh <- r.handleSetupConn(delOut) // Receive DeleteRule request. + close(errCh) + }() + + // Emulate SetupNode sending RequestRegistrationID request. + proto := setup.NewSetupProtocol(requestIDIn) + ids, err := proto.ReserveRtIDs(context.TODO(), 1) + require.NoError(t, err) + + // Emulate SetupNode sending AddRule request. + rule := routing.IntermediaryForwardRule(10*time.Minute, ids[0], 3, uuid.New()) + proto = setup.NewSetupProtocol(addIn) + err = proto.AddRules(context.TODO(), []routing.Rule{rule}) + require.NoError(t, err) + + // Check routing table state after AddRule. + assert.Equal(t, 1, rt.Count()) + r, err := rt.Rule(ids[0]) + require.NoError(t, err) + assert.Equal(t, rule, r) + + // Emulate SetupNode sending RemoveRule request. + require.NoError(t, setup.DeleteRule(context.TODO(), setup.NewSetupProtocol(delIn), ids[0])) + + // Check routing table state after DeleteRule. + assert.Equal(t, 0, rt.Count()) + r, err = rt.Rule(ids[0]) + assert.Error(t, err) + assert.Nil(t, r) + + require.NoError(t, requestIDIn.Close()) + require.NoError(t, addIn.Close()) + require.NoError(t, delIn.Close()) + for err := range errCh { + require.NoError(t, err) + } + } + }) // TEST: Ensure DeleteRule requests from SetupNode is handled properly. - // t.Run("DeleteRules", func(t *testing.T) { - // clearRules() - // - // in, out := net.Pipe() - // errCh := make(chan error, 1) - // go func() { - // errCh <- r.handleSetupConn(out) - // close(errCh) - // }() - // defer func() { - // require.NoError(t, in.Close()) - // require.NoError(t, <-errCh) - // }() - // - // proto := setup.NewSetupProtocol(in) - // - // id, err := r.rt.ReserveKey() - // require.NoError(t, err) - // - // rule := routing.IntermediaryForwardRule(10*time.Minute, id, 3, uuid.New()) - // - // err = r.rt.SaveRule(rule) - // require.NoError(t, err) - // - // assert.Equal(t, 1, rt.Count()) - // - // require.NoError(t, setup.DeleteRule(context.TODO(), proto, id)) - // assert.Equal(t, 0, rt.Count()) - // }) + t.Run("DeleteRules", func(t *testing.T) { + clearRules() + + in, out := net.Pipe() + errCh := make(chan error, 1) + go func() { + errCh <- r.handleSetupConn(out) + close(errCh) + }() + defer func() { + require.NoError(t, in.Close()) + require.NoError(t, <-errCh) + }() + + proto := setup.NewSetupProtocol(in) + + id, err := r.rt.ReserveKey() + require.NoError(t, err) + + rule := routing.IntermediaryForwardRule(10*time.Minute, id, 3, uuid.New()) + + err = r.rt.SaveRule(rule) + require.NoError(t, err) + + assert.Equal(t, 1, rt.Count()) + + require.NoError(t, setup.DeleteRule(context.TODO(), proto, id)) + assert.Equal(t, 0, rt.Count()) + }) // TEST: Ensure visorRoutesCreated request from SetupNode is handled properly. - // t.Run("RoutesCreated", func(t *testing.T) { - // clearRules() - // - // var inLoop routing.Loop - // var inRule routing.Rule - // - // r.OnRoutesCreated = func(loop routing.Loop, rule routing.Rule) (err error) { - // inLoop = loop - // inRule = rule - // return nil - // } - // defer func() { r.OnRoutesCreated = nil }() - // - // in, out := net.Pipe() - // errCh := make(chan error, 1) - // go func() { - // errCh <- r.handleSetupConn(out) - // close(errCh) - // }() - // defer func() { - // require.NoError(t, in.Close()) - // require.NoError(t, <-errCh) - // }() - // - // proto := setup.NewSetupProtocol(in) - // pk, _ := cipher.GenerateKeyPair() - // - // rule := routing.ConsumeRule(10*time.Minute, 2, pk, 2, 3) - // require.NoError(t, rt.SaveRule(rule)) - // - // rule = routing.IntermediaryForwardRule(10*time.Minute, 1, 3, uuid.New()) - // require.NoError(t, rt.SaveRule(rule)) - // - // ld := routing.LoopData{ - // Loop: routing.Loop{ - // Remote: routing.Addr{ - // PubKey: pk, - // Port: 3, - // }, - // Local: routing.Addr{ - // Port: 2, - // }, - // }, - // RouteID: 1, - // } - // err := proto.RoutesCreated(context.TODO(), ld) - // require.NoError(t, err) - // assert.Equal(t, rule, inRule) - // assert.Equal(t, routing.Port(2), inLoop.Local.Port) - // assert.Equal(t, routing.Port(3), inLoop.Remote.Port) - // assert.Equal(t, pk, inLoop.Remote.PubKey) - // }) + t.Run("RoutesCreated", func(t *testing.T) { + clearRules() + + var inLoop routing.Loop + var inRule routing.Rule + + r.OnRoutesCreated = func(loop routing.Loop, rule routing.Rule) (err error) { + inLoop = loop + inRule = rule + return nil + } + defer func() { r.OnRoutesCreated = nil }() + + in, out := net.Pipe() + errCh := make(chan error, 1) + go func() { + errCh <- r.handleSetupConn(out) + close(errCh) + }() + defer func() { + require.NoError(t, in.Close()) + require.NoError(t, <-errCh) + }() + + proto := setup.NewSetupProtocol(in) + pk, _ := cipher.GenerateKeyPair() + + rule := routing.ConsumeRule(10*time.Minute, 2, pk, 2, 3) + require.NoError(t, rt.SaveRule(rule)) + + rule = routing.IntermediaryForwardRule(10*time.Minute, 1, 3, uuid.New()) + require.NoError(t, rt.SaveRule(rule)) + + ld := routing.LoopData{ + Loop: routing.Loop{ + Remote: routing.Addr{ + PubKey: pk, + Port: 3, + }, + Local: routing.Addr{ + Port: 2, + }, + }, + RouteID: 1, + } + err := proto.RoutesCreated(context.TODO(), ld) + require.NoError(t, err) + assert.Equal(t, rule, inRule) + assert.Equal(t, routing.Port(2), inLoop.Local.Port) + assert.Equal(t, routing.Port(3), inLoop.Remote.Port) + assert.Equal(t, pk, inLoop.Remote.PubKey) + }) // TEST: Ensure LoopClosed request from SetupNode is handled properly. - // t.Run("LoopClosed", func(t *testing.T) { - // clearRules() - // - // var inLoop routing.Loop - // - // r.OnLoopClosed = func(loop routing.Loop) error { - // inLoop = loop - // return nil - // } - // defer func() { r.OnLoopClosed = nil }() - // - // in, out := net.Pipe() - // errCh := make(chan error, 1) - // go func() { - // errCh <- r.handleSetupConn(out) - // close(errCh) - // }() - // defer func() { - // require.NoError(t, in.Close()) - // require.NoError(t, <-errCh) - // }() - // - // proto := setup.NewSetupProtocol(in) - // pk, _ := cipher.GenerateKeyPair() - // - // rule := routing.ConsumeRule(10*time.Minute, 2, pk, 2, 3) - // require.NoError(t, rt.SaveRule(rule)) - // - // rule = routing.IntermediaryForwardRule(10*time.Minute, 1, 3, uuid.New()) - // require.NoError(t, rt.SaveRule(rule)) - // - // ld := routing.LoopData{ - // Loop: routing.Loop{ - // Remote: routing.Addr{ - // PubKey: pk, - // Port: 3, - // }, - // Local: routing.Addr{ - // Port: 2, - // }, - // }, - // RouteID: 1, - // } - // require.NoError(t, setup.LoopClosed(context.TODO(), proto, ld)) - // assert.Equal(t, routing.Port(2), inLoop.Local.Port) - // assert.Equal(t, routing.Port(3), inLoop.Remote.Port) - // assert.Equal(t, pk, inLoop.Remote.PubKey) - // }) + t.Run("LoopClosed", func(t *testing.T) { + clearRules() + + var inLoop routing.Loop + + r.OnLoopClosed = func(loop routing.Loop) error { + inLoop = loop + return nil + } + defer func() { r.OnLoopClosed = nil }() + + in, out := net.Pipe() + errCh := make(chan error, 1) + go func() { + errCh <- r.handleSetupConn(out) + close(errCh) + }() + defer func() { + require.NoError(t, in.Close()) + require.NoError(t, <-errCh) + }() + + proto := setup.NewSetupProtocol(in) + pk, _ := cipher.GenerateKeyPair() + + rule := routing.ConsumeRule(10*time.Minute, 2, pk, 2, 3) + require.NoError(t, rt.SaveRule(rule)) + + rule = routing.IntermediaryForwardRule(10*time.Minute, 1, 3, uuid.New()) + require.NoError(t, rt.SaveRule(rule)) + + ld := routing.LoopData{ + Loop: routing.Loop{ + Remote: routing.Addr{ + PubKey: pk, + Port: 3, + }, + Local: routing.Addr{ + Port: 2, + }, + }, + RouteID: 1, + } + require.NoError(t, setup.LoopClosed(context.TODO(), proto, ld)) + assert.Equal(t, routing.Port(2), inLoop.Local.Port) + assert.Equal(t, routing.Port(3), inLoop.Remote.Port) + assert.Equal(t, pk, inLoop.Remote.PubKey) + }) } type TestEnv struct { diff --git a/pkg/router/gateway.go b/pkg/router/rpc_gateway.go similarity index 76% rename from pkg/router/gateway.go rename to pkg/router/rpc_gateway.go index 841c48c195..7d63adee77 100644 --- a/pkg/router/gateway.go +++ b/pkg/router/rpc_gateway.go @@ -7,19 +7,19 @@ import ( "github.com/skycoin/skywire/pkg/setup" ) -type Gateway struct { +type RPCGateway struct { logger *logging.Logger - router *router // TODO(nkryuchkov): move part of Router methods to Gateway + router *router // TODO(nkryuchkov): move part of Router methods to RPCGateway } -func NewGateway(router *router) *Gateway { - return &Gateway{ +func NewRPCGateway(router *router) *RPCGateway { + return &RPCGateway{ logger: logging.MustGetLogger("router-gateway"), router: router, } } -func (r *Gateway) AddEdgeRules(rules routing.EdgeRules, ok *bool) error { +func (r *RPCGateway) AddEdgeRules(rules routing.EdgeRules, ok *bool) error { go func() { r.router.accept <- rules }() @@ -34,7 +34,7 @@ func (r *Gateway) AddEdgeRules(rules routing.EdgeRules, ok *bool) error { return nil } -func (r *Gateway) AddIntermediaryRules(rules []routing.Rule, ok *bool) error { +func (r *RPCGateway) AddIntermediaryRules(rules []routing.Rule, ok *bool) error { if err := r.router.saveRoutingRules(rules...); err != nil { *ok = false r.logger.WithError(err).Warnf("Request completed with error.") @@ -45,7 +45,7 @@ func (r *Gateway) AddIntermediaryRules(rules []routing.Rule, ok *bool) error { return nil } -func (r *Gateway) ReserveIDs(n uint8, routeIDs *[]routing.RouteID) error { +func (r *RPCGateway) ReserveIDs(n uint8, routeIDs *[]routing.RouteID) error { ids, err := r.router.occupyRouteID(n) if err != nil { r.logger.WithError(err).Warnf("Request completed with error.") diff --git a/pkg/router/rpc_gateway_test.go b/pkg/router/rpc_gateway_test.go new file mode 100644 index 0000000000..cfe286e5b6 --- /dev/null +++ b/pkg/router/rpc_gateway_test.go @@ -0,0 +1,2 @@ +package router + diff --git a/pkg/routing/packet_test.go b/pkg/routing/packet_test.go index 8333e93db3..fe413f6bea 100644 --- a/pkg/routing/packet_test.go +++ b/pkg/routing/packet_test.go @@ -6,15 +6,32 @@ import ( "github.com/stretchr/testify/assert" ) -func TestMakePacket(t *testing.T) { +func TestMakeDataPacket(t *testing.T) { packet := MakeDataPacket(2, []byte("foo")) - assert.Equal( - t, - []byte{0x0, 0x3, 0x0, 0x0, 0x0, 0x2, 0x66, 0x6f, 0x6f}, - []byte(packet), - ) + expected := []byte{0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x3, 0x66, 0x6f, 0x6f} + assert.Equal(t, expected, []byte(packet)) assert.Equal(t, uint16(3), packet.Size()) assert.Equal(t, RouteID(2), packet.RouteID()) assert.Equal(t, []byte("foo"), packet.Payload()) } + +func TestMakeClosePacket(t *testing.T) { + packet := MakeClosePacket(3, CloseRequested) + expected := []byte{0x1, 0x0, 0x0, 0x0, 0x3, 0x0, 0x1, 0x0} + + assert.Equal(t, expected, []byte(packet)) + assert.Equal(t, uint16(1), packet.Size()) + assert.Equal(t, RouteID(3), packet.RouteID()) + assert.Equal(t, []byte{0x0}, packet.Payload()) +} + +func TestMakeKeepAlivePacket(t *testing.T) { + packet := MakeKeepAlivePacket(4) + expected := []byte{0x2, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0} + + assert.Equal(t, expected, []byte(packet)) + assert.Equal(t, uint16(0), packet.Size()) + assert.Equal(t, RouteID(4), packet.RouteID()) + assert.Equal(t, []byte{}, packet.Payload()) +} diff --git a/pkg/setup/node.go b/pkg/setup/node.go index a65e161996..612d042f8e 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -83,7 +83,7 @@ func (sn *Node) Serve() error { sn.logger.WithField("requester", conn.RemotePK()).Infof("Received request.") rpcS := rpc.NewServer() - if err := rpcS.Register(NewGateway(conn.RemotePK(), sn)); err != nil { + if err := rpcS.Register(NewRPCGateway(conn.RemotePK(), sn)); err != nil { return err } go rpcS.ServeConn(conn) diff --git a/pkg/setup/node_test.go b/pkg/setup/node_test.go index 2ad099075b..b07369ecf6 100644 --- a/pkg/setup/node_test.go +++ b/pkg/setup/node_test.go @@ -3,11 +3,28 @@ package setup import ( + "context" + "encoding/json" + "errors" + "fmt" "log" "os" + "sync" + "sync/atomic" "testing" + "time" + "github.com/google/uuid" + "github.com/skycoin/dmsg" + "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/dmsg/disc" "github.com/skycoin/skycoin/src/util/logging" + "github.com/stretchr/testify/require" + "golang.org/x/net/nettest" + + "github.com/skycoin/skywire/pkg/metrics" + "github.com/skycoin/skywire/pkg/routing" + "github.com/skycoin/skywire/pkg/snet" ) func TestMain(m *testing.M) { @@ -40,324 +57,324 @@ func TestMain(m *testing.M) { // 3. Hanging may be not the problem of the DMSG. Probably some of the communication part here is wrong. // The reason I think so is that - if we ensure read timeouts, why doesn't this test constantly fail? // Maybe some wrapper for DMSG is wrong, or some internal operations before the actual communication behave bad -// func TestNode(t *testing.T) { -// // Prepare mock dmsg discovery. -// discovery := disc.NewMock() -// -// // Prepare dmsg server. -// server, serverErr := createServer(t, discovery) -// defer func() { -// require.NoError(t, server.Close()) -// require.NoError(t, errWithTimeout(serverErr)) -// }() -// -// type clientWithDMSGAddrAndListener struct { -// *dmsg.Client -// Addr dmsg.Addr -// Listener *dmsg.Listener -// } -// -// // CLOSURE: sets up dmsg clients. -// prepClients := func(n int) ([]clientWithDMSGAddrAndListener, func()) { -// clients := make([]clientWithDMSGAddrAndListener, n) -// for i := 0; i < n; i++ { -// var port uint16 -// // setup node -// if i == 0 { -// port = snet.SetupPort -// } else { -// port = snet.AwaitSetupPort -// } -// pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte{byte(i)}) -// require.NoError(t, err) -// t.Logf("client[%d] PK: %s\n", i, pk) -// c := dmsg.NewClient(pk, sk, discovery, dmsg.SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s:%d", i, pk, port)))) -// require.NoError(t, c.InitiateServerConnections(context.TODO(), 1)) -// listener, err := c.Listen(port) -// require.NoError(t, err) -// clients[i] = clientWithDMSGAddrAndListener{ -// Client: c, -// Addr: dmsg.Addr{ -// PK: pk, -// Port: port, -// }, -// Listener: listener, -// } -// } -// return clients, func() { -// for _, c := range clients { -// //require.NoError(t, c.Listener.Close()) -// require.NoError(t, c.Close()) -// } -// } -// } -// -// // CLOSURE: sets up setup node. -// prepSetupNode := func(c *dmsg.Client, listener *dmsg.Listener) (*Node, func()) { -// sn := &Node{ -// logger: logging.MustGetLogger("setup_node"), -// dmsgC: c, -// dmsgL: listener, -// metrics: metrics.NewDummy(), -// } -// go func() { -// if err := sn.Serve(); err != nil { -// sn.logger.WithError(err).Error("Failed to serve") -// } -// }() -// return sn, func() { -// require.NoError(t, sn.Close()) -// } -// } -// -//// TEST: Emulates the communication between 4 visor nodes and a setup node, -//// where the first client node initiates a loop to the last. -//t.Run("CreateRoutes", func(t *testing.T) { -// // client index 0 is for setup node. -// // clients index 1 to 4 are for visor nodes. -// clients, closeClients := prepClients(5) -// defer closeClients() -// -// // prepare and serve setup node (using client 0). -// _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) -// setupPK := clients[0].Addr.PK -// setupPort := clients[0].Addr.Port -// defer closeSetup() -// -// // prepare loop creation (client_1 will use this to request loop creation with setup node). -// ld := routing.LoopDescriptor{ -// Loop: routing.Loop{ -// Local: routing.Addr{PubKey: clients[1].Addr.PK, Port: 1}, -// Remote: routing.Addr{PubKey: clients[4].Addr.PK, Port: 1}, -// }, -// Reverse: routing.Route{ -// &routing.Hop{From: clients[1].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, -// &routing.Hop{From: clients[2].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, -// &routing.Hop{From: clients[3].Addr.PK, To: clients[4].Addr.PK, Transport: uuid.New()}, -// }, -// Forward: routing.Route{ -// &routing.Hop{From: clients[4].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, -// &routing.Hop{From: clients[3].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, -// &routing.Hop{From: clients[2].Addr.PK, To: clients[1].Addr.PK, Transport: uuid.New()}, -// }, -// KeepAlive: 1 * time.Hour, -// } -// -// // client_1 initiates loop creation with setup node. -// iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) -// require.NoError(t, err) -// iTpErrs := make(chan error, 2) -// go func() { -// iTpErrs <- CreateRoutes(context.TODO(), NewSetupProtocol(iTp), ld) -// iTpErrs <- iTp.Close() -// close(iTpErrs) -// }() -// defer func() { -// i := 0 -// for err := range iTpErrs { -// require.NoError(t, err, i) -// i++ -// } -// }() -// -// var addRuleDone sync.WaitGroup -// var nextRouteID uint32 -// // CLOSURE: emulates how a visor node should react when expecting an AddRules packet. -// expectAddRules := func(client int, expRule routing.RuleType) { -// conn, err := clients[client].Listener.Accept() -// require.NoError(t, err) -// -// fmt.Printf("client %v:%v accepted\n", client, clients[client].Addr) -// -// proto := NewSetupProtocol(conn) -// -// pt, _, err := proto.ReadPacket() -// require.NoError(t, err) -// require.Equal(t, PacketRequestRouteID, pt) -// -// fmt.Printf("client %v:%v got PacketRequestRouteID\n", client, clients[client].Addr) -// -// routeID := atomic.AddUint32(&nextRouteID, 1) -// -// // TODO: This error is not checked due to a bug in dmsg. -// _ = proto.WritePacket(RespSuccess, []routing.RouteID{routing.RouteID(routeID)}) // nolint:errcheck -// require.NoError(t, err) -// -// fmt.Printf("client %v:%v responded to with registration ID: %v\n", client, clients[client].Addr, routeID) -// -// require.NoError(t, conn.Close()) -// -// conn, err = clients[client].Listener.Accept() -// require.NoError(t, err) -// -// fmt.Printf("client %v:%v accepted 2nd time\n", client, clients[client].Addr) -// -// proto = NewSetupProtocol(conn) -// -// pt, pp, err := proto.ReadPacket() -// require.NoError(t, err) -// require.Equal(t, PacketAddRules, pt) -// -// fmt.Printf("client %v:%v got PacketAddRules\n", client, clients[client].Addr) -// -// var rs []routing.Rule -// require.NoError(t, json.Unmarshal(pp, &rs)) -// -// for _, r := range rs { -// require.Equal(t, expRule, r.Type()) -// } -// -// // TODO: This error is not checked due to a bug in dmsg. -// err = proto.WritePacket(RespSuccess, nil) -// _ = err -// -// fmt.Printf("client %v:%v responded for PacketAddRules\n", client, clients[client].Addr) -// -// require.NoError(t, conn.Close()) -// -// addRuleDone.Done() -// } -// -// // CLOSURE: emulates how a visor node should react when expecting an OnConfirmLoop packet. -// expectConfirmLoop := func(client int) { -// tp, err := clients[client].Listener.AcceptTransport() -// require.NoError(t, err) -// -// proto := NewSetupProtocol(tp) -// -// pt, pp, err := proto.ReadPacket() -// require.NoError(t, err) -// require.Equal(t, PacketConfirmLoop, pt) -// -// var d routing.LoopData -// require.NoError(t, json.Unmarshal(pp, &d)) -// -// switch client { -// case 1: -// require.Equal(t, ld.Loop, d.Loop) -// case 4: -// require.Equal(t, ld.Loop.Local, d.Loop.Remote) -// require.Equal(t, ld.Loop.Remote, d.Loop.Local) -// default: -// t.Fatalf("We shouldn't be receiving a OnConfirmLoop packet from client %d", client) -// } -// -// // TODO: This error is not checked due to a bug in dmsg. -// err = proto.WritePacket(RespSuccess, nil) -// _ = err -// -// require.NoError(t, tp.Close()) -// } -// -// // since the route establishment is asynchronous, -// // we must expect all the messages in parallel -// addRuleDone.Add(4) -// go expectAddRules(4, routing.RuleApp) -// go expectAddRules(3, routing.RuleForward) -// go expectAddRules(2, routing.RuleForward) -// go expectAddRules(1, routing.RuleForward) -// addRuleDone.Wait() -// fmt.Println("FORWARD ROUTE DONE") -// addRuleDone.Add(4) -// go expectAddRules(1, routing.RuleApp) -// go expectAddRules(2, routing.RuleForward) -// go expectAddRules(3, routing.RuleForward) -// go expectAddRules(4, routing.RuleForward) -// addRuleDone.Wait() -// fmt.Println("REVERSE ROUTE DONE") -// expectConfirmLoop(1) -// expectConfirmLoop(4) -//}) -// -// TEST: Emulates the communication between 2 visor nodes and a setup nodes, -// where a route is already established, -// and the first client attempts to tear it down. -// t.Run("CloseLoop", func(t *testing.T) { -// // client index 0 is for setup node. -// // clients index 1 and 2 are for visor nodes. -// clients, closeClients := prepClients(3) -// defer closeClients() -// -// // prepare and serve setup node. -// _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) -// setupPK := clients[0].Addr.PK -// setupPort := clients[0].Addr.Port -// defer closeSetup() -// -// // prepare loop data describing the loop that is to be closed. -// ld := routing.LoopData{ -// Loop: routing.Loop{ -// Local: routing.Addr{ -// PubKey: clients[1].Addr.PK, -// Port: 1, -// }, -// Remote: routing.Addr{ -// PubKey: clients[2].Addr.PK, -// Port: 2, -// }, -// }, -// RouteID: 3, -// } -// -// // client_1 initiates close loop with setup node. -// iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) -// require.NoError(t, err) -// iTpErrs := make(chan error, 2) -// go func() { -// iTpErrs <- CloseLoop(context.TODO(), NewSetupProtocol(iTp), ld) -// iTpErrs <- iTp.Close() -// close(iTpErrs) -// }() -// defer func() { -// i := 0 -// for err := range iTpErrs { -// require.NoError(t, err, i) -// i++ -// } -// }() -// -// // client_2 accepts close request. -// tp, err := clients[2].Listener.AcceptTransport() -// require.NoError(t, err) -// defer func() { require.NoError(t, tp.Close()) }() -// -// proto := NewSetupProtocol(tp) -// -// pt, pp, err := proto.ReadPacket() -// require.NoError(t, err) -// require.Equal(t, PacketLoopClosed, pt) -// -// var d routing.LoopData -// require.NoError(t, json.Unmarshal(pp, &d)) -// require.Equal(t, ld.Loop.Remote, d.Loop.Local) -// require.Equal(t, ld.Loop.Local, d.Loop.Remote) -// -// // TODO: This error is not checked due to a bug in dmsg. -// err = proto.WritePacket(RespSuccess, nil) -// _ = err -// }) -// } -// -// func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) { -// pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte("s")) -// require.NoError(t, err) -// l, err := nettest.NewLocalListener("tcp") -// require.NoError(t, err) -// srv, err = dmsg.NewServer(pk, sk, "", l, dc) -// require.NoError(t, err) -// errCh := make(chan error, 1) -// go func() { -// errCh <- srv.Serve() -// close(errCh) -// }() -// return srv, errCh -// } -// -// func errWithTimeout(ch <-chan error) error { -// select { -// case err := <-ch: -// return err -// case <-time.After(5 * time.Second): -// return errors.New("timeout") -// } -// } +func TestNode(t *testing.T) { + // Prepare mock dmsg discovery. + discovery := disc.NewMock() + + // Prepare dmsg server. + server, serverErr := createServer(t, discovery) + defer func() { + require.NoError(t, server.Close()) + require.NoError(t, errWithTimeout(serverErr)) + }() + + type clientWithDMSGAddrAndListener struct { + *dmsg.Client + Addr dmsg.Addr + Listener *dmsg.Listener + } + + // CLOSURE: sets up dmsg clients. + prepClients := func(n int) ([]clientWithDMSGAddrAndListener, func()) { + clients := make([]clientWithDMSGAddrAndListener, n) + for i := 0; i < n; i++ { + var port uint16 + // setup node + if i == 0 { + port = snet.SetupPort + } else { + port = snet.AwaitSetupPort + } + pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte{byte(i)}) + require.NoError(t, err) + t.Logf("client[%d] PK: %s\n", i, pk) + c := dmsg.NewClient(pk, sk, discovery, dmsg.SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s:%d", i, pk, port)))) + require.NoError(t, c.InitiateServerConnections(context.TODO(), 1)) + listener, err := c.Listen(port) + require.NoError(t, err) + clients[i] = clientWithDMSGAddrAndListener{ + Client: c, + Addr: dmsg.Addr{ + PK: pk, + Port: port, + }, + Listener: listener, + } + } + return clients, func() { + for _, c := range clients { + //require.NoError(t, c.Listener.Close()) + require.NoError(t, c.Close()) + } + } + } + + // CLOSURE: sets up setup node. + prepSetupNode := func(c *dmsg.Client, listener *dmsg.Listener) (*Node, func()) { + sn := &Node{ + logger: logging.MustGetLogger("setup_node"), + dmsgC: c, + dmsgL: listener, + metrics: metrics.NewDummy(), + } + go func() { + if err := sn.Serve(); err != nil { + sn.logger.WithError(err).Error("Failed to serve") + } + }() + return sn, func() { + require.NoError(t, sn.Close()) + } + } + + // TEST: Emulates the communication between 4 visor nodes and a setup node, + // where the first client node initiates a loop to the last. + t.Run("CreateRoutes", func(t *testing.T) { + // client index 0 is for setup node. + // clients index 1 to 4 are for visor nodes. + clients, closeClients := prepClients(5) + defer closeClients() + + // prepare and serve setup node (using client 0). + _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) + setupPK := clients[0].Addr.PK + setupPort := clients[0].Addr.Port + defer closeSetup() + + // prepare loop creation (client_1 will use this to request loop creation with setup node). + ld := routing.LoopDescriptor{ + Loop: routing.Loop{ + Local: routing.Addr{PubKey: clients[1].Addr.PK, Port: 1}, + Remote: routing.Addr{PubKey: clients[4].Addr.PK, Port: 1}, + }, + Reverse: routing.Route{ + &routing.Hop{From: clients[1].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, + &routing.Hop{From: clients[2].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, + &routing.Hop{From: clients[3].Addr.PK, To: clients[4].Addr.PK, Transport: uuid.New()}, + }, + Forward: routing.Route{ + &routing.Hop{From: clients[4].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, + &routing.Hop{From: clients[3].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, + &routing.Hop{From: clients[2].Addr.PK, To: clients[1].Addr.PK, Transport: uuid.New()}, + }, + KeepAlive: 1 * time.Hour, + } + + // client_1 initiates loop creation with setup node. + iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) + require.NoError(t, err) + iTpErrs := make(chan error, 2) + go func() { + iTpErrs <- CreateRoutes(context.TODO(), NewSetupProtocol(iTp), ld) + iTpErrs <- iTp.Close() + close(iTpErrs) + }() + defer func() { + i := 0 + for err := range iTpErrs { + require.NoError(t, err, i) + i++ + } + }() + + var addRuleDone sync.WaitGroup + var nextRouteID uint32 + // CLOSURE: emulates how a visor node should react when expecting an AddRules packet. + expectAddRules := func(client int, expRule routing.RuleType) { + conn, err := clients[client].Listener.Accept() + require.NoError(t, err) + + fmt.Printf("client %v:%v accepted\n", client, clients[client].Addr) + + proto := NewSetupProtocol(conn) + + pt, _, err := proto.ReadPacket() + require.NoError(t, err) + require.Equal(t, PacketRequestRouteID, pt) + + fmt.Printf("client %v:%v got PacketRequestRouteID\n", client, clients[client].Addr) + + routeID := atomic.AddUint32(&nextRouteID, 1) + + // TODO: This error is not checked due to a bug in dmsg. + _ = proto.WritePacket(RespSuccess, []routing.RouteID{routing.RouteID(routeID)}) // nolint:errcheck + require.NoError(t, err) + + fmt.Printf("client %v:%v responded to with registration ID: %v\n", client, clients[client].Addr, routeID) + + require.NoError(t, conn.Close()) + + conn, err = clients[client].Listener.Accept() + require.NoError(t, err) + + fmt.Printf("client %v:%v accepted 2nd time\n", client, clients[client].Addr) + + proto = NewSetupProtocol(conn) + + pt, pp, err := proto.ReadPacket() + require.NoError(t, err) + require.Equal(t, PacketAddRules, pt) + + fmt.Printf("client %v:%v got PacketAddRules\n", client, clients[client].Addr) + + var rs []routing.Rule + require.NoError(t, json.Unmarshal(pp, &rs)) + + for _, r := range rs { + require.Equal(t, expRule, r.Type()) + } + + // TODO: This error is not checked due to a bug in dmsg. + err = proto.WritePacket(RespSuccess, nil) + _ = err + + fmt.Printf("client %v:%v responded for PacketAddRules\n", client, clients[client].Addr) + + require.NoError(t, conn.Close()) + + addRuleDone.Done() + } + + // CLOSURE: emulates how a visor node should react when expecting an OnConfirmLoop packet. + expectConfirmLoop := func(client int) { + tp, err := clients[client].Listener.AcceptTransport() + require.NoError(t, err) + + proto := NewSetupProtocol(tp) + + pt, pp, err := proto.ReadPacket() + require.NoError(t, err) + require.Equal(t, PacketConfirmLoop, pt) + + var d routing.LoopData + require.NoError(t, json.Unmarshal(pp, &d)) + + switch client { + case 1: + require.Equal(t, ld.Loop, d.Loop) + case 4: + require.Equal(t, ld.Loop.Local, d.Loop.Remote) + require.Equal(t, ld.Loop.Remote, d.Loop.Local) + default: + t.Fatalf("We shouldn't be receiving a OnConfirmLoop packet from client %d", client) + } + + // TODO: This error is not checked due to a bug in dmsg. + err = proto.WritePacket(RespSuccess, nil) + _ = err + + require.NoError(t, tp.Close()) + } + + // since the route establishment is asynchronous, + // we must expect all the messages in parallel + addRuleDone.Add(4) + go expectAddRules(4, routing.RuleApp) + go expectAddRules(3, routing.RuleForward) + go expectAddRules(2, routing.RuleForward) + go expectAddRules(1, routing.RuleForward) + addRuleDone.Wait() + fmt.Println("FORWARD ROUTE DONE") + addRuleDone.Add(4) + go expectAddRules(1, routing.RuleApp) + go expectAddRules(2, routing.RuleForward) + go expectAddRules(3, routing.RuleForward) + go expectAddRules(4, routing.RuleForward) + addRuleDone.Wait() + fmt.Println("REVERSE ROUTE DONE") + expectConfirmLoop(1) + expectConfirmLoop(4) + }) + + // TEST: Emulates the communication between 2 visor nodes and a setup nodes, + // where a route is already established, + // and the first client attempts to tear it down. + t.Run("CloseLoop", func(t *testing.T) { + // client index 0 is for setup node. + // clients index 1 and 2 are for visor nodes. + clients, closeClients := prepClients(3) + defer closeClients() + + // prepare and serve setup node. + _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) + setupPK := clients[0].Addr.PK + setupPort := clients[0].Addr.Port + defer closeSetup() + + // prepare loop data describing the loop that is to be closed. + ld := routing.LoopData{ + Loop: routing.Loop{ + Local: routing.Addr{ + PubKey: clients[1].Addr.PK, + Port: 1, + }, + Remote: routing.Addr{ + PubKey: clients[2].Addr.PK, + Port: 2, + }, + }, + RouteID: 3, + } + + // client_1 initiates close loop with setup node. + iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) + require.NoError(t, err) + iTpErrs := make(chan error, 2) + go func() { + iTpErrs <- CloseLoop(context.TODO(), NewSetupProtocol(iTp), ld) + iTpErrs <- iTp.Close() + close(iTpErrs) + }() + defer func() { + i := 0 + for err := range iTpErrs { + require.NoError(t, err, i) + i++ + } + }() + + // client_2 accepts close request. + tp, err := clients[2].Listener.AcceptTransport() + require.NoError(t, err) + defer func() { require.NoError(t, tp.Close()) }() + + proto := NewSetupProtocol(tp) + + pt, pp, err := proto.ReadPacket() + require.NoError(t, err) + require.Equal(t, PacketLoopClosed, pt) + + var d routing.LoopData + require.NoError(t, json.Unmarshal(pp, &d)) + require.Equal(t, ld.Loop.Remote, d.Loop.Local) + require.Equal(t, ld.Loop.Local, d.Loop.Remote) + + // TODO: This error is not checked due to a bug in dmsg. + err = proto.WritePacket(RespSuccess, nil) + _ = err + }) +} + +func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) { + pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte("s")) + require.NoError(t, err) + l, err := nettest.NewLocalListener("tcp") + require.NoError(t, err) + srv, err = dmsg.NewServer(pk, sk, "", l, dc) + require.NoError(t, err) + errCh := make(chan error, 1) + go func() { + errCh <- srv.Serve() + close(errCh) + }() + return srv, errCh +} + +func errWithTimeout(ch <-chan error) error { + select { + case err := <-ch: + return err + case <-time.After(5 * time.Second): + return errors.New("timeout") + } +} diff --git a/pkg/setup/gateway.go b/pkg/setup/rpc_gateway.go similarity index 89% rename from pkg/setup/gateway.go rename to pkg/setup/rpc_gateway.go index 6eb2ccc1fb..5a276a815a 100644 --- a/pkg/setup/gateway.go +++ b/pkg/setup/rpc_gateway.go @@ -13,21 +13,21 @@ import ( "github.com/skycoin/skywire/pkg/routing" ) -type Gateway struct { +type RPCGateway struct { logger *logging.Logger reqPK cipher.PubKey sn *Node } -func NewGateway(reqPK cipher.PubKey, sn *Node) *Gateway { - return &Gateway{ +func NewRPCGateway(reqPK cipher.PubKey, sn *Node) *RPCGateway { + return &RPCGateway{ logger: logging.MustGetLogger("setup-gateway"), reqPK: reqPK, sn: sn, } } -func (g *Gateway) DialRouteGroup(route routing.BidirectionalRoute, rules *routing.EdgeRules) (failure error) { +func (g *RPCGateway) DialRouteGroup(route routing.BidirectionalRoute, rules *routing.EdgeRules) (failure error) { startTime := time.Now() defer func() { g.sn.metrics.Record(time.Since(startTime), failure != nil) @@ -107,7 +107,7 @@ func (g *Gateway) DialRouteGroup(route routing.BidirectionalRoute, rules *routin return nil } -func (g *Gateway) reserveRouteIDs(ctx context.Context, route routing.BidirectionalRoute) (*idReservoir, error) { +func (g *RPCGateway) reserveRouteIDs(ctx context.Context, route routing.BidirectionalRoute) (*idReservoir, error) { reservoir, total := newIDReservoir(route.Forward, route.Reverse) g.logger.Infof("There are %d route IDs to reserve.", total) diff --git a/pkg/visor/visor_test.go b/pkg/visor/visor_test.go index 5ae56cb7a9..fea4a3e624 100644 --- a/pkg/visor/visor_test.go +++ b/pkg/visor/visor_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "github.com/skycoin/skywire/pkg/app" + "github.com/skycoin/skywire/pkg/router" "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/snet" "github.com/skycoin/skywire/pkg/transport" @@ -259,6 +260,14 @@ type mockRouter struct { errChan chan error } +func (r *mockRouter) DialRoutes(ctx context.Context, rPK cipher.PubKey, lPort, rPort routing.Port, opts *router.DialOptions) (*router.RouteGroup, error) { + panic("implement me") +} + +func (r *mockRouter) AcceptRoutes() (*router.RouteGroup, error) { + panic("implement me") +} + func (r *mockRouter) Ports() []routing.Port { r.Lock() p := r.ports