Skip to content

Commit

Permalink
Merge pull request #508 from evanlinjin/feature/decouple-managedTrans…
Browse files Browse the repository at this point in the history
…port

[WIP] Decouple managedTransport and improve transport handling logic.
  • Loading branch information
志宇 authored Aug 14, 2019
2 parents bb1f9e4 + 2fd452a commit d61a857
Show file tree
Hide file tree
Showing 30 changed files with 1,328 additions and 1,402 deletions.
4 changes: 2 additions & 2 deletions pkg/route-finder/client/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ func (r *mockClient) PairedRoutes(src, dst cipher.PubKey, minHops, maxHops uint1
&routing.Hop{
From: src,
To: dst,
Transport: transport.MakeTransportID(src, dst, "", true),
Transport: transport.MakeTransportID(src, dst, ""),
},
},
}, []routing.Route{
{
&routing.Hop{
From: src,
To: dst,
Transport: transport.MakeTransportID(src, dst, "", true),
Transport: transport.MakeTransportID(src, dst, ""),
},
},
}, nil
Expand Down
33 changes: 18 additions & 15 deletions pkg/router/app_manager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package router

import (
"context"
"encoding/json"
"errors"
"time"

"github.com/skycoin/skycoin/src/util/logging"

Expand All @@ -13,9 +15,9 @@ import (
const supportedProtocolVersion = "0.0.1"

type appCallbacks struct {
CreateLoop func(conn *app.Protocol, raddr routing.Addr) (laddr routing.Addr, err error)
CloseLoop func(conn *app.Protocol, loop routing.Loop) error
Forward func(conn *app.Protocol, packet *app.Packet) error
CreateLoop func(ctx context.Context, conn *app.Protocol, raddr routing.Addr) (laddr routing.Addr, err error)
CloseLoop func(ctx context.Context, conn *app.Protocol, loop routing.Loop) error
Forward func(ctx context.Context, conn *app.Protocol, packet *app.Packet) error
}

type appManager struct {
Expand All @@ -29,15 +31,19 @@ type appManager struct {
func (am *appManager) Serve() error {
return am.proto.Serve(func(frame app.Frame, payload []byte) (res interface{}, err error) {
am.Logger.Infof("Got new App request with type %s: %s", frame, string(payload))

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

switch frame {
case app.FrameInit:
err = am.initApp(payload)
case app.FrameCreateLoop:
res, err = am.setupLoop(payload)
res, err = am.setupLoop(ctx, payload)
case app.FrameClose:
err = am.handleCloseLoop(payload)
err = am.handleCloseLoop(ctx, payload)
case app.FrameSend:
err = am.forwardAppPacket(payload)
err = am.forwardAppPacket(ctx, payload)
default:
err = errors.New("unexpected frame")
}
Expand Down Expand Up @@ -72,29 +78,26 @@ func (am *appManager) initApp(payload []byte) error {
return nil
}

func (am *appManager) setupLoop(payload []byte) (routing.Addr, error) {
func (am *appManager) setupLoop(ctx context.Context, payload []byte) (routing.Addr, error) {
var raddr routing.Addr
if err := json.Unmarshal(payload, &raddr); err != nil {
return routing.Addr{}, err
}

return am.callbacks.CreateLoop(am.proto, raddr)
return am.callbacks.CreateLoop(ctx, am.proto, raddr)
}

func (am *appManager) handleCloseLoop(payload []byte) error {
func (am *appManager) handleCloseLoop(ctx context.Context, payload []byte) error {
var loop routing.Loop
if err := json.Unmarshal(payload, &loop); err != nil {
return err
}

return am.callbacks.CloseLoop(am.proto, loop)
return am.callbacks.CloseLoop(ctx, am.proto, loop)
}

func (am *appManager) forwardAppPacket(payload []byte) error {
func (am *appManager) forwardAppPacket(ctx context.Context, payload []byte) error {
packet := &app.Packet{}
if err := json.Unmarshal(payload, packet); err != nil {
return err
}

return am.callbacks.Forward(am.proto, packet)
return am.callbacks.Forward(ctx, am.proto, packet)
}
7 changes: 4 additions & 3 deletions pkg/router/app_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package router

import (
"context"
"net"
"testing"

Expand Down Expand Up @@ -66,7 +67,7 @@ func TestAppManagerSetupLoop(t *testing.T) {
app.NewProtocol(out),
&app.Config{AppName: "foo", AppVersion: "0.0.1"},
&appCallbacks{
CreateLoop: func(conn *app.Protocol, raddr routing.Addr) (laddr routing.Addr, err error) {
CreateLoop: func(ctx context.Context, conn *app.Protocol, raddr routing.Addr) (laddr routing.Addr, err error) {
return raddr, nil
},
},
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestAppManagerCloseLoop(t *testing.T) {
app.NewProtocol(out),
&app.Config{AppName: "foo", AppVersion: "0.0.1"},
&appCallbacks{
CloseLoop: func(conn *app.Protocol, loop routing.Loop) error {
CloseLoop: func(ctx context.Context, conn *app.Protocol, loop routing.Loop) error {
inLoop = loop
return nil
},
Expand Down Expand Up @@ -139,7 +140,7 @@ func TestAppManagerForward(t *testing.T) {
app.NewProtocol(out),
&app.Config{AppName: "foo", AppVersion: "0.0.1"},
&appCallbacks{
Forward: func(conn *app.Protocol, packet *app.Packet) error {
Forward: func(ctx context.Context, conn *app.Protocol, packet *app.Packet) error {
inPacket = packet
return nil
},
Expand Down
6 changes: 6 additions & 0 deletions pkg/router/route_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func (rm *routeManager) GetRule(routeID routing.RouteID) (routing.Rule, error) {
return nil, errors.New("unknown RouteID")
}

// TODO(evanlinjin): This is a workaround for ensuring the read-in rule is of the correct size.
// Sometimes it is not, causing a segfault later down the line.
if len(rule) < routing.RuleHeaderSize {
return nil, errors.New("corrupted rule")
}

if rule.Expiry().Before(time.Now()) {
return nil, errors.New("expired routing rule")
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/router/route_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package router

import (
"context"
"net"
"testing"
"time"
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestRouteManagerAddRemoveRule(t *testing.T) {
proto := setup.NewSetupProtocol(in)

rule := routing.ForwardRule(time.Now(), 3, uuid.New())
id, err := setup.AddRule(proto, rule)
id, err := setup.AddRule(context.TODO(), proto, rule)
require.NoError(t, err)
assert.Equal(t, routing.RouteID(1), id)

Expand Down Expand Up @@ -111,7 +112,7 @@ func TestRouteManagerDeleteRules(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 1, rt.Count())

require.NoError(t, setup.DeleteRule(proto, id))
require.NoError(t, setup.DeleteRule(context.TODO(), proto, id))
assert.Equal(t, 0, rt.Count())

require.NoError(t, in.Close())
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestRouteManagerConfirmLoop(t *testing.T) {
},
RouteID: 1,
}
err := setup.ConfirmLoop(proto, ld)
err := setup.ConfirmLoop(context.TODO(), proto, ld)
require.NoError(t, err)
assert.Equal(t, rule, inRule)
assert.Equal(t, routing.Port(2), inLoop.Local.Port)
Expand Down Expand Up @@ -207,7 +208,7 @@ func TestRouteManagerLoopClosed(t *testing.T) {
},
RouteID: 1,
}
require.NoError(t, setup.LoopClosed(proto, ld))
require.NoError(t, setup.LoopClosed(context.TODO(), proto, ld))
assert.Equal(t, routing.Port(2), inLoop.Local.Port)
assert.Equal(t, routing.Port(3), inLoop.Remote.Port)
assert.Equal(t, pk, inLoop.Remote.PubKey)
Expand Down
Loading

0 comments on commit d61a857

Please sign in to comment.