Skip to content

Commit

Permalink
Improved transport handling logic.
Browse files Browse the repository at this point in the history
* Each transport now uses two underlying uni-directional connections. This allows for clearer responsibilities when re-establishing a transport.
* Moved much of the logic from `transport.Manager` to `managedTransport` to improve clarity.
* Settlement Handshake: only the responder updates the status to transport discovery.
* Removed the ability to register 'private' transport.
* Better use of `context.Context` throughout the codebase.
* Added timeouts to `setup` library to avoid SetupNode hanging.
* Changed various golang examples to proper tests.
* Cleaned up some logging.
* Cleaned up some tests.
* TODO: Some `router` tests are failing and have been commented out. These need to be fixed at a later stage.
  • Loading branch information
Evan Lin committed Aug 10, 2019
1 parent 19b57ef commit c5e1cd4
Show file tree
Hide file tree
Showing 24 changed files with 1,277 additions and 1,367 deletions.
4 changes: 2 additions & 2 deletions pkg/route-finder/client/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ func (r *mockClient) PairedRoutes(src, dst cipher.PubKey, minHops, maxHops uint1
&routing.Hop{
From: src,
To: dst,
Transport: transport.MakeTransportID(src, dst, "", true),
Transport: transport.MakeTransportID(src, dst, ""),
},
},
}, []routing.Route{
{
&routing.Hop{
From: src,
To: dst,
Transport: transport.MakeTransportID(src, dst, "", true),
Transport: transport.MakeTransportID(src, dst, ""),
},
},
}, nil
Expand Down
33 changes: 18 additions & 15 deletions pkg/router/app_manager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package router

import (
"context"
"encoding/json"
"errors"
"time"

"github.com/skycoin/skycoin/src/util/logging"

Expand All @@ -13,9 +15,9 @@ import (
const supportedProtocolVersion = "0.0.1"

type appCallbacks struct {
CreateLoop func(conn *app.Protocol, raddr routing.Addr) (laddr routing.Addr, err error)
CloseLoop func(conn *app.Protocol, loop routing.Loop) error
Forward func(conn *app.Protocol, packet *app.Packet) error
CreateLoop func(ctx context.Context, conn *app.Protocol, raddr routing.Addr) (laddr routing.Addr, err error)
CloseLoop func(ctx context.Context, conn *app.Protocol, loop routing.Loop) error
Forward func(ctx context.Context, conn *app.Protocol, packet *app.Packet) error
}

type appManager struct {
Expand All @@ -29,15 +31,19 @@ type appManager struct {
func (am *appManager) Serve() error {
return am.proto.Serve(func(frame app.Frame, payload []byte) (res interface{}, err error) {
am.Logger.Infof("Got new App request with type %s: %s", frame, string(payload))

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

switch frame {
case app.FrameInit:
err = am.initApp(payload)
case app.FrameCreateLoop:
res, err = am.setupLoop(payload)
res, err = am.setupLoop(ctx, payload)
case app.FrameClose:
err = am.handleCloseLoop(payload)
err = am.handleCloseLoop(ctx, payload)
case app.FrameSend:
err = am.forwardAppPacket(payload)
err = am.forwardAppPacket(ctx, payload)
default:
err = errors.New("unexpected frame")
}
Expand Down Expand Up @@ -72,29 +78,26 @@ func (am *appManager) initApp(payload []byte) error {
return nil
}

func (am *appManager) setupLoop(payload []byte) (routing.Addr, error) {
func (am *appManager) setupLoop(ctx context.Context, payload []byte) (routing.Addr, error) {
var raddr routing.Addr
if err := json.Unmarshal(payload, &raddr); err != nil {
return routing.Addr{}, err
}

return am.callbacks.CreateLoop(am.proto, raddr)
return am.callbacks.CreateLoop(ctx, am.proto, raddr)
}

func (am *appManager) handleCloseLoop(payload []byte) error {
func (am *appManager) handleCloseLoop(ctx context.Context, payload []byte) error {
var loop routing.Loop
if err := json.Unmarshal(payload, &loop); err != nil {
return err
}

return am.callbacks.CloseLoop(am.proto, loop)
return am.callbacks.CloseLoop(ctx, am.proto, loop)
}

func (am *appManager) forwardAppPacket(payload []byte) error {
func (am *appManager) forwardAppPacket(ctx context.Context, payload []byte) error {
packet := &app.Packet{}
if err := json.Unmarshal(payload, packet); err != nil {
return err
}

return am.callbacks.Forward(am.proto, packet)
return am.callbacks.Forward(ctx, am.proto, packet)
}
7 changes: 4 additions & 3 deletions pkg/router/app_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package router

import (
"context"
"net"
"testing"

Expand Down Expand Up @@ -66,7 +67,7 @@ func TestAppManagerSetupLoop(t *testing.T) {
app.NewProtocol(out),
&app.Config{AppName: "foo", AppVersion: "0.0.1"},
&appCallbacks{
CreateLoop: func(conn *app.Protocol, raddr routing.Addr) (laddr routing.Addr, err error) {
CreateLoop: func(ctx context.Context, conn *app.Protocol, raddr routing.Addr) (laddr routing.Addr, err error) {
return raddr, nil
},
},
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestAppManagerCloseLoop(t *testing.T) {
app.NewProtocol(out),
&app.Config{AppName: "foo", AppVersion: "0.0.1"},
&appCallbacks{
CloseLoop: func(conn *app.Protocol, loop routing.Loop) error {
CloseLoop: func(ctx context.Context, conn *app.Protocol, loop routing.Loop) error {
inLoop = loop
return nil
},
Expand Down Expand Up @@ -139,7 +140,7 @@ func TestAppManagerForward(t *testing.T) {
app.NewProtocol(out),
&app.Config{AppName: "foo", AppVersion: "0.0.1"},
&appCallbacks{
Forward: func(conn *app.Protocol, packet *app.Packet) error {
Forward: func(ctx context.Context, conn *app.Protocol, packet *app.Packet) error {
inPacket = packet
return nil
},
Expand Down
9 changes: 5 additions & 4 deletions pkg/router/route_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package router

import (
"context"
"net"
"testing"
"time"
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestRouteManagerAddRemoveRule(t *testing.T) {
proto := setup.NewSetupProtocol(in)

rule := routing.ForwardRule(time.Now(), 3, uuid.New())
id, err := setup.AddRule(proto, rule)
id, err := setup.AddRule(context.TODO(), proto, rule)
require.NoError(t, err)
assert.Equal(t, routing.RouteID(1), id)

Expand Down Expand Up @@ -111,7 +112,7 @@ func TestRouteManagerDeleteRules(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 1, rt.Count())

require.NoError(t, setup.DeleteRule(proto, id))
require.NoError(t, setup.DeleteRule(context.TODO(), proto, id))
assert.Equal(t, 0, rt.Count())

require.NoError(t, in.Close())
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestRouteManagerConfirmLoop(t *testing.T) {
},
RouteID: 1,
}
err := setup.ConfirmLoop(proto, ld)
err := setup.ConfirmLoop(context.TODO(), proto, ld)
require.NoError(t, err)
assert.Equal(t, rule, inRule)
assert.Equal(t, routing.Port(2), inLoop.Local.Port)
Expand Down Expand Up @@ -207,7 +208,7 @@ func TestRouteManagerLoopClosed(t *testing.T) {
},
RouteID: 1,
}
require.NoError(t, setup.LoopClosed(proto, ld))
require.NoError(t, setup.LoopClosed(context.TODO(), proto, ld))
assert.Equal(t, routing.Port(2), inLoop.Local.Port)
assert.Equal(t, routing.Port(3), inLoop.Remote.Port)
assert.Equal(t, pk, inLoop.Remote.PubKey)
Expand Down
Loading

0 comments on commit c5e1cd4

Please sign in to comment.