Skip to content

Commit

Permalink
Merge pull request #519 from Darkren/feature/make-route-creation-async
Browse files Browse the repository at this point in the history
Make route establishment asynchronous.
  • Loading branch information
志宇 authored Aug 22, 2019
2 parents 565b956 + 1cd7c13 commit 1aadc31
Show file tree
Hide file tree
Showing 13 changed files with 653 additions and 374 deletions.
4 changes: 2 additions & 2 deletions cmd/skywire-cli/commands/node/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ var addRuleCmd = &cobra.Command{
remotePort = routing.Port(parseUint("remote-port", args[3], 16))
localPort = routing.Port(parseUint("local-port", args[4], 16))
)
rule = routing.AppRule(time.Now().Add(expire), routeID, remotePK, remotePort, localPort)
rule = routing.AppRule(time.Now().Add(expire), routeID, remotePK, remotePort, localPort, 0)
case "fwd":
var (
nextRouteID = routing.RouteID(parseUint("next-route-id", args[1], 32))
nextTpID = internal.ParseUUID("next-transport-id", args[2])
)
rule = routing.ForwardRule(time.Now().Add(expire), nextRouteID, nextTpID)
rule = routing.ForwardRule(time.Now().Add(expire), nextRouteID, nextTpID, 0)
}
rIDKey, err := rpcClient().AddRoutingRule(rule)
internal.Catch(err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/router/managed_routing_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (
func TestManagedRoutingTableCleanup(t *testing.T) {
rt := manageRoutingTable(routing.InMemoryRoutingTable())

_, err := rt.AddRule(routing.ForwardRule(time.Now().Add(time.Hour), 3, uuid.New()))
_, err := rt.AddRule(routing.ForwardRule(time.Now().Add(time.Hour), 3, uuid.New(), 1))
require.NoError(t, err)

id, err := rt.AddRule(routing.ForwardRule(time.Now().Add(-time.Hour), 3, uuid.New()))
id, err := rt.AddRule(routing.ForwardRule(time.Now().Add(-time.Hour), 3, uuid.New(), 2))
require.NoError(t, err)

id2, err := rt.AddRule(routing.ForwardRule(time.Now().Add(-time.Hour), 3, uuid.New()))
id2, err := rt.AddRule(routing.ForwardRule(time.Now().Add(-time.Hour), 3, uuid.New(), 3))
require.NoError(t, err)

assert.Equal(t, 3, rt.Count())
Expand Down
32 changes: 20 additions & 12 deletions pkg/router/route_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func (rm *routeManager) Close() error {
}

func (rm *routeManager) Serve() {

// Routing table garbage collect loop.
go rm.rtGarbageCollectLoop()

Expand Down Expand Up @@ -113,13 +112,15 @@ func (rm *routeManager) handleSetupConn(conn net.Conn) error {
var respBody interface{}
switch t {
case setup.PacketAddRules:
respBody, err = rm.addRoutingRules(body)
err = rm.setRoutingRules(body)
case setup.PacketDeleteRules:
respBody, err = rm.deleteRoutingRules(body)
case setup.PacketConfirmLoop:
err = rm.confirmLoop(body)
case setup.PacketLoopClosed:
err = rm.loopClosed(body)
case setup.PacketRequestRouteID:
respBody, err = rm.occupyRouteID()
default:
err = errors.New("unknown foundation packet")
}
Expand Down Expand Up @@ -214,24 +215,22 @@ func (rm *routeManager) RemoveLoopRule(loop routing.Loop) error {
return nil
}

func (rm *routeManager) addRoutingRules(data []byte) ([]routing.RouteID, error) {
func (rm *routeManager) setRoutingRules(data []byte) error {
var rules []routing.Rule
if err := json.Unmarshal(data, &rules); err != nil {
return nil, err
return err
}

res := make([]routing.RouteID, len(rules))
for idx, rule := range rules {
routeID, err := rm.rt.AddRule(rule)
if err != nil {
return nil, fmt.Errorf("routing table: %s", err)
for _, rule := range rules {
routeID := rule.RequestRouteID()
if err := rm.rt.SetRule(routeID, rule); err != nil {
return fmt.Errorf("routing table: %s", err)
}

res[idx] = routeID
rm.Logger.Infof("Added new Routing Rule with ID %d %s", routeID, rule)
rm.Logger.Infof("Set new Routing Rule with ID %d %s", routeID, rule)
}

return res, nil
return nil
}

func (rm *routeManager) deleteRoutingRules(data []byte) ([]routing.RouteID, error) {
Expand Down Expand Up @@ -307,3 +306,12 @@ func (rm *routeManager) loopClosed(data []byte) error {

return rm.conf.OnLoopClosed(ld.Loop)
}

func (rm *routeManager) occupyRouteID() ([]routing.RouteID, error) {
routeID, err := rm.rt.AddRule(nil)
if err != nil {
return nil, err
}

return []routing.RouteID{routeID}, nil
}
32 changes: 19 additions & 13 deletions pkg/router/route_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func TestNewRouteManager(t *testing.T) {
t.Run("GetRule", func(t *testing.T) {
defer clearRules()

expiredRule := routing.ForwardRule(time.Now().Add(-10*time.Minute), 3, uuid.New())
expiredRule := routing.ForwardRule(time.Now().Add(-10*time.Minute), 3, uuid.New(), 1)
expiredID, err := rt.AddRule(expiredRule)
require.NoError(t, err)

rule := routing.ForwardRule(time.Now().Add(10*time.Minute), 3, uuid.New())
rule := routing.ForwardRule(time.Now().Add(10*time.Minute), 3, uuid.New(), 2)
id, err := rt.AddRule(rule)
require.NoError(t, err)

Expand All @@ -67,7 +67,7 @@ func TestNewRouteManager(t *testing.T) {
defer clearRules()

pk, _ := cipher.GenerateKeyPair()
rule := routing.AppRule(time.Now(), 3, pk, 3, 2)
rule := routing.AppRule(time.Now(), 3, pk, 3, 2, 1)
_, err := rt.AddRule(rule)
require.NoError(t, err)

Expand All @@ -86,28 +86,34 @@ func TestNewRouteManager(t *testing.T) {

// Add/Remove rules multiple times.
for i := 0; i < 5; i++ {

// As setup connections close after a single request completes
// So we need two pairs of connections.
requestIDIn, requestIDOut := net.Pipe()
addIn, addOut := net.Pipe()
delIn, delOut := net.Pipe()
errCh := make(chan error, 2)
go func() {
errCh <- rm.handleSetupConn(addOut) // Receive AddRule request.
errCh <- rm.handleSetupConn(delOut) // Receive DeleteRule request.
errCh <- rm.handleSetupConn(requestIDOut) // Receive RequestRegistrationID request.
errCh <- rm.handleSetupConn(addOut) // Receive AddRule request.
errCh <- rm.handleSetupConn(delOut) // Receive DeleteRule request.
close(errCh)
}()
defer func() {
require.NoError(t, requestIDIn.Close())
require.NoError(t, addIn.Close())
require.NoError(t, delIn.Close())
for err := range errCh {
require.NoError(t, err)
}
}()

// Emulate SetupNode sending RequestRegistrationID request.
id, err := setup.RequestRouteID(context.TODO(), setup.NewSetupProtocol(requestIDIn))
require.NoError(t, err)

// Emulate SetupNode sending AddRule request.
rule := routing.ForwardRule(time.Now(), 3, uuid.New())
id, err := setup.AddRule(context.TODO(), setup.NewSetupProtocol(addIn), rule)
rule := routing.ForwardRule(time.Now(), 3, uuid.New(), id)
err = setup.AddRule(context.TODO(), setup.NewSetupProtocol(addIn), rule)
require.NoError(t, err)

// Check routing table state after AddRule.
Expand Down Expand Up @@ -144,7 +150,7 @@ func TestNewRouteManager(t *testing.T) {

proto := setup.NewSetupProtocol(in)

rule := routing.ForwardRule(time.Now(), 3, uuid.New())
rule := routing.ForwardRule(time.Now(), 3, uuid.New(), 1)
id, err := rt.AddRule(rule)
require.NoError(t, err)
assert.Equal(t, 1, rt.Count())
Expand Down Expand Up @@ -180,10 +186,10 @@ func TestNewRouteManager(t *testing.T) {

proto := setup.NewSetupProtocol(in)
pk, _ := cipher.GenerateKeyPair()
rule := routing.AppRule(time.Now(), 3, pk, 3, 2)
rule := routing.AppRule(time.Now(), 3, pk, 3, 2, 2)
require.NoError(t, rt.SetRule(2, rule))

rule = routing.ForwardRule(time.Now(), 3, uuid.New())
rule = routing.ForwardRule(time.Now(), 3, uuid.New(), 1)
require.NoError(t, rt.SetRule(1, rule))

ld := routing.LoopData{
Expand Down Expand Up @@ -232,10 +238,10 @@ func TestNewRouteManager(t *testing.T) {
proto := setup.NewSetupProtocol(in)
pk, _ := cipher.GenerateKeyPair()

rule := routing.AppRule(time.Now(), 3, pk, 3, 2)
rule := routing.AppRule(time.Now(), 3, pk, 3, 2, 0)
require.NoError(t, rt.SetRule(2, rule))

rule = routing.ForwardRule(time.Now(), 3, uuid.New())
rule = routing.ForwardRule(time.Now(), 3, uuid.New(), 1)
require.NoError(t, rt.SetRule(1, rule))

ld := routing.LoopData{
Expand Down
2 changes: 1 addition & 1 deletion pkg/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestRouter_Serve(t *testing.T) {
defer clearRules(r0, r1)

// Add a FWD rule for r0.
fwdRule := routing.ForwardRule(time.Now().Add(time.Hour), routing.RouteID(5), tp1.Entry.ID)
fwdRule := routing.ForwardRule(time.Now().Add(time.Hour), routing.RouteID(5), tp1.Entry.ID, routing.RouteID(0))
fwdRtID, err := r0.rm.rt.AddRule(fwdRule)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions pkg/routing/routing_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestMain(m *testing.M) {
func RoutingTableSuite(t *testing.T, tbl Table) {
t.Helper()

rule := ForwardRule(time.Now(), 2, uuid.New())
rule := ForwardRule(time.Now(), 2, uuid.New(), 1)
id, err := tbl.AddRule(rule)
require.NoError(t, err)

Expand All @@ -39,7 +39,7 @@ func RoutingTableSuite(t *testing.T, tbl Table) {
require.NoError(t, err)
assert.Equal(t, rule, r)

rule2 := ForwardRule(time.Now(), 3, uuid.New())
rule2 := ForwardRule(time.Now(), 3, uuid.New(), 2)
id2, err := tbl.AddRule(rule2)
require.NoError(t, err)

Expand Down
42 changes: 30 additions & 12 deletions pkg/routing/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (r Rule) TransportID() uuid.UUID {
if r.Type() != RuleForward {
panic("invalid rule")
}
return uuid.Must(uuid.FromBytes(r[13:]))
return uuid.Must(uuid.FromBytes(r[13:29]))
}

// RemotePK returns remove PK for an app rule.
Expand Down Expand Up @@ -101,6 +101,18 @@ func (r Rule) LocalPort() Port {
return Port(binary.BigEndian.Uint16(r[48:]))
}

// RequestRouteID returns route ID which will be used to register this rule within
// the visor node.
func (r Rule) RequestRouteID() RouteID {
return RouteID(binary.BigEndian.Uint32(r[50:]))
}

// SetRequestRouteID sets the route ID which will be used to register this rule within
// the visor node.
func (r Rule) SetRequestRouteID(id RouteID) {
binary.BigEndian.PutUint32(r[50:], uint32(id))
}

func (r Rule) String() string {
if r.Type() == RuleApp {
return fmt.Sprintf("App: <resp-rid: %d><remote-pk: %s><remote-port: %d><local-port: %d>",
Expand All @@ -126,30 +138,32 @@ type RuleForwardFields struct {

// RuleSummary provides a summary of a RoutingRule.
type RuleSummary struct {
ExpireAt time.Time `json:"expire_at"`
Type RuleType `json:"rule_type"`
AppFields *RuleAppFields `json:"app_fields,omitempty"`
ForwardFields *RuleForwardFields `json:"forward_fields,omitempty"`
ExpireAt time.Time `json:"expire_at"`
Type RuleType `json:"rule_type"`
AppFields *RuleAppFields `json:"app_fields,omitempty"`
ForwardFields *RuleForwardFields `json:"forward_fields,omitempty"`
RequestRouteID RouteID `json:"request_route_id"`
}

// ToRule converts RoutingRuleSummary to RoutingRule.
func (rs *RuleSummary) ToRule() (Rule, error) {
if rs.Type == RuleApp && rs.AppFields != nil && rs.ForwardFields == nil {
f := rs.AppFields
return AppRule(rs.ExpireAt, f.RespRID, f.RemotePK, f.RemotePort, f.LocalPort), nil
return AppRule(rs.ExpireAt, f.RespRID, f.RemotePK, f.RemotePort, f.LocalPort, rs.RequestRouteID), nil
}
if rs.Type == RuleForward && rs.AppFields == nil && rs.ForwardFields != nil {
f := rs.ForwardFields
return ForwardRule(rs.ExpireAt, f.NextRID, f.NextTID), nil
return ForwardRule(rs.ExpireAt, f.NextRID, f.NextTID, rs.RequestRouteID), nil
}
return nil, errors.New("invalid routing rule summary")
}

// Summary returns the RoutingRule's summary.
func (r Rule) Summary() *RuleSummary {
summary := RuleSummary{
ExpireAt: r.Expiry(),
Type: r.Type(),
ExpireAt: r.Expiry(),
Type: r.Type(),
RequestRouteID: r.RequestRouteID(),
}
if summary.Type == RuleApp {
summary.AppFields = &RuleAppFields{
Expand All @@ -168,7 +182,8 @@ func (r Rule) Summary() *RuleSummary {
}

// AppRule constructs a new consume RoutingRule.
func AppRule(expireAt time.Time, respRoute RouteID, remotePK cipher.PubKey, remotePort, localPort Port) Rule {
func AppRule(expireAt time.Time, respRoute RouteID, remotePK cipher.PubKey, remotePort, localPort Port,
requestRouteID RouteID) Rule {
rule := make([]byte, RuleHeaderSize)
if expireAt.Unix() <= time.Now().Unix() {
binary.BigEndian.PutUint64(rule[0:], 0)
Expand All @@ -179,14 +194,15 @@ func AppRule(expireAt time.Time, respRoute RouteID, remotePK cipher.PubKey, remo
rule[8] = byte(RuleApp)
binary.BigEndian.PutUint32(rule[9:], uint32(respRoute))
rule = append(rule, remotePK[:]...)
rule = append(rule, 0, 0, 0, 0)
rule = append(rule, 0, 0, 0, 0, 0, 0, 0, 0)
binary.BigEndian.PutUint16(rule[46:], uint16(remotePort))
binary.BigEndian.PutUint16(rule[48:], uint16(localPort))
binary.BigEndian.PutUint32(rule[50:], uint32(requestRouteID))
return Rule(rule)
}

// ForwardRule constructs a new forward RoutingRule.
func ForwardRule(expireAt time.Time, nextRoute RouteID, nextTrID uuid.UUID) Rule {
func ForwardRule(expireAt time.Time, nextRoute RouteID, nextTrID uuid.UUID, requestRouteID RouteID) Rule {
rule := make([]byte, RuleHeaderSize)
if expireAt.Unix() <= time.Now().Unix() {
binary.BigEndian.PutUint64(rule[0:], 0)
Expand All @@ -197,5 +213,7 @@ func ForwardRule(expireAt time.Time, nextRoute RouteID, nextTrID uuid.UUID) Rule
rule[8] = byte(RuleForward)
binary.BigEndian.PutUint32(rule[9:], uint32(nextRoute))
rule = append(rule, nextTrID[:]...)
rule = append(rule, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
binary.BigEndian.PutUint32(rule[50:], uint32(requestRouteID))
return Rule(rule)
}
4 changes: 2 additions & 2 deletions pkg/routing/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestAppRule(t *testing.T) {
expireAt := time.Now().Add(2 * time.Minute)
pk, _ := cipher.GenerateKeyPair()
rule := AppRule(expireAt, 2, pk, 3, 4)
rule := AppRule(expireAt, 2, pk, 3, 4, 1)

assert.Equal(t, expireAt.Unix(), rule.Expiry().Unix())
assert.Equal(t, RuleApp, rule.Type())
Expand All @@ -28,7 +28,7 @@ func TestAppRule(t *testing.T) {
func TestForwardRule(t *testing.T) {
trID := uuid.New()
expireAt := time.Now().Add(2 * time.Minute)
rule := ForwardRule(expireAt, 2, trID)
rule := ForwardRule(expireAt, 2, trID, 1)

assert.Equal(t, expireAt.Unix(), rule.Expiry().Unix())
assert.Equal(t, RuleForward, rule.Type())
Expand Down
Loading

0 comments on commit 1aadc31

Please sign in to comment.