Skip to content

Commit

Permalink
Merge pull request #527 from Darkren/fix/rule-expiration
Browse files Browse the repository at this point in the history
Change rule expiry to keep-alive
  • Loading branch information
志宇 authored Sep 4, 2019
2 parents 30daea1 + aa4b798 commit 2a81b33
Show file tree
Hide file tree
Showing 16 changed files with 141 additions and 85 deletions.
12 changes: 6 additions & 6 deletions cmd/skywire-cli/commands/node/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ var rmRuleCmd = &cobra.Command{
},
}

var expire time.Duration
var keepAlive time.Duration

func init() {
addRuleCmd.PersistentFlags().DurationVar(&expire, "expire", router.RouteTTL, "duration after which routing rule will expire")
addRuleCmd.PersistentFlags().DurationVar(&keepAlive, "keep-alive", router.DefaultRouteKeepAlive, "duration after which routing rule will expire if no activity is present")
}

var addRuleCmd = &cobra.Command{
Expand Down Expand Up @@ -100,13 +100,13 @@ var addRuleCmd = &cobra.Command{
remotePort = routing.Port(parseUint("remote-port", args[3], 16))
localPort = routing.Port(parseUint("local-port", args[4], 16))
)
rule = routing.AppRule(time.Now().Add(expire), routeID, remotePK, remotePort, localPort, 0)
rule = routing.AppRule(keepAlive, routeID, remotePK, remotePort, localPort, 0)
case "fwd":
var (
nextRouteID = routing.RouteID(parseUint("next-route-id", args[1], 32))
nextTpID = internal.ParseUUID("next-transport-id", args[2])
)
rule = routing.ForwardRule(time.Now().Add(expire), nextRouteID, nextTpID, 0)
rule = routing.ForwardRule(keepAlive, nextRouteID, nextTpID, 0)
}
rIDKey, err := rpcClient().AddRoutingRule(rule)
internal.Catch(err)
Expand All @@ -117,12 +117,12 @@ var addRuleCmd = &cobra.Command{
func printRoutingRules(rules ...*visor.RoutingEntry) {
printAppRule := func(w io.Writer, id routing.RouteID, s *routing.RuleSummary) {
_, err := fmt.Fprintf(w, "%d\t%s\t%d\t%d\t%s\t%d\t%s\t%s\t%s\n", id, s.Type, s.AppFields.LocalPort,
s.AppFields.RemotePort, s.AppFields.RemotePK, s.AppFields.RespRID, "-", "-", s.ExpireAt)
s.AppFields.RemotePort, s.AppFields.RemotePK, s.AppFields.RespRID, "-", "-", s.KeepAlive)
internal.Catch(err)
}
printFwdRule := func(w io.Writer, id routing.RouteID, s *routing.RuleSummary) {
_, err := fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%s\t%s\t%d\t%s\t%s\n", id, s.Type, "-",
"-", "-", "-", s.ForwardFields.NextRID, s.ForwardFields.NextTID, s.ExpireAt)
"-", "-", "-", s.ForwardFields.NextRID, s.ForwardFields.NextTID, s.KeepAlive)
internal.Catch(err)
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 5, ' ', tabwriter.TabIndent)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ require (
)

// Uncomment for tests with alternate branches of 'dmsg'
//replace github.com/skycoin/dmsg => ../dmsg
replace github.com/skycoin/dmsg => ../dmsg
69 changes: 59 additions & 10 deletions pkg/router/managed_routing_table.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package router

import (
"errors"
"sync"
"time"

"github.com/skycoin/skywire/pkg/routing"
)

var routeKeepalive = 10 * time.Minute // interval to keep active expired routes
var (
// ErrRuleTimedOut is being returned while trying to access the rule which timed out
ErrRuleTimedOut = errors.New("rule keep-alive timeout exceeded")
)

type managedRoutingTable struct {
routing.Table
Expand All @@ -23,29 +27,74 @@ func manageRoutingTable(rt routing.Table) *managedRoutingTable {
}
}

func (rt *managedRoutingTable) AddRule(rule routing.Rule) (routing.RouteID, error) {
rt.mu.Lock()
defer rt.mu.Unlock()

routeID, err := rt.Table.AddRule(rule)
if err != nil {
return 0, err
}

// set the initial activity for rule not to be timed out instantly
rt.activity[routeID] = time.Now()

return routeID, nil
}

func (rt *managedRoutingTable) Rule(routeID routing.RouteID) (routing.Rule, error) {
rt.mu.Lock()
defer rt.mu.Unlock()

rule, err := rt.Table.Rule(routeID)
if err != nil {
return nil, err
}

if rt.ruleIsTimedOut(routeID, rule) {
return nil, ErrRuleTimedOut
}

rt.activity[routeID] = time.Now()
rt.mu.Unlock()
return rt.Table.Rule(routeID)

return rule, nil
}

func (rt *managedRoutingTable) Cleanup() error {
expiredIDs := make([]routing.RouteID, 0)
rt.mu.Lock()
defer rt.mu.Unlock()

err := rt.RangeRules(func(routeID routing.RouteID, rule routing.Rule) bool {
if rule.Expiry().Before(time.Now()) {
if lastActivity, ok := rt.activity[routeID]; !ok || time.Since(lastActivity) > routeKeepalive {
expiredIDs = append(expiredIDs, routeID)
}
if rt.ruleIsTimedOut(routeID, rule) {
expiredIDs = append(expiredIDs, routeID)
}
return true
})
rt.mu.Unlock()

if err != nil {
return err
}

return rt.DeleteRules(expiredIDs...)
if err := rt.DeleteRules(expiredIDs...); err != nil {
return err
}

rt.deleteActivity(expiredIDs...)

return nil
}

// ruleIsExpired checks whether rule's keep alive timeout is exceeded.
// NOTE: for internal use, is NOT thread-safe, object lock should be acquired outside
func (rt *managedRoutingTable) ruleIsTimedOut(routeID routing.RouteID, rule routing.Rule) bool {
lastActivity, ok := rt.activity[routeID]
return !ok || time.Since(lastActivity) > rule.KeepAlive()
}

// deleteActivity removes activity records for the specified set of `routeIDs`.
// NOTE: for internal use, is NOT thread-safe, object lock should be acquired outside
func (rt *managedRoutingTable) deleteActivity(routeIDs ...routing.RouteID) {
for _, rID := range routeIDs {
delete(rt.activity, rID)
}
}
10 changes: 7 additions & 3 deletions pkg/router/managed_routing_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ import (
func TestManagedRoutingTableCleanup(t *testing.T) {
rt := manageRoutingTable(routing.InMemoryRoutingTable())

_, err := rt.AddRule(routing.ForwardRule(time.Now().Add(time.Hour), 3, uuid.New(), 1))
_, err := rt.AddRule(routing.ForwardRule(1*time.Hour, 3, uuid.New(), 1))
require.NoError(t, err)

id, err := rt.AddRule(routing.ForwardRule(time.Now().Add(-time.Hour), 3, uuid.New(), 2))
id, err := rt.AddRule(routing.ForwardRule(1*time.Hour, 3, uuid.New(), 2))
require.NoError(t, err)

id2, err := rt.AddRule(routing.ForwardRule(time.Now().Add(-time.Hour), 3, uuid.New(), 3))
id2, err := rt.AddRule(routing.ForwardRule(-1*time.Hour, 3, uuid.New(), 3))
require.NoError(t, err)

// rule should already be expired at this point due to the execution time.
// However, we'll just a bit to be sure
time.Sleep(1 * time.Millisecond)

assert.Equal(t, 3, rt.Count())

_, err = rt.Rule(id)
Expand Down
6 changes: 1 addition & 5 deletions pkg/router/route_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,6 @@ func (rm *routeManager) GetRule(routeID routing.RouteID) (routing.Rule, error) {
return nil, errors.New("corrupted rule")
}

if rule.Expiry().Before(time.Now()) {
return nil, errors.New("expired routing rule")
}

return rule, nil
}

Expand Down Expand Up @@ -317,7 +313,7 @@ func (rm *routeManager) loopClosed(data []byte) error {
}

func (rm *routeManager) occupyRouteID() ([]routing.RouteID, error) {
rule := routing.ForwardRule(time.Now().Add(RouteTTL), 0, uuid.UUID{}, 0)
rule := routing.ForwardRule(DefaultRouteKeepAlive, 0, uuid.UUID{}, 0)
routeID, err := rm.rt.AddRule(rule)
if err != nil {
return nil, err
Expand Down
26 changes: 15 additions & 11 deletions pkg/router/route_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,18 @@ func TestNewRouteManager(t *testing.T) {
t.Run("GetRule", func(t *testing.T) {
defer clearRules()

expiredRule := routing.ForwardRule(time.Now().Add(-10*time.Minute), 3, uuid.New(), 1)
expiredID, err := rt.AddRule(expiredRule)
expiredRule := routing.ForwardRule(-10*time.Minute, 3, uuid.New(), 1)
expiredID, err := rm.rt.AddRule(expiredRule)
require.NoError(t, err)

rule := routing.ForwardRule(time.Now().Add(10*time.Minute), 3, uuid.New(), 2)
id, err := rt.AddRule(rule)
rule := routing.ForwardRule(10*time.Minute, 3, uuid.New(), 2)
id, err := rm.rt.AddRule(rule)
require.NoError(t, err)

// rule should already be expired at this point due to the execution time.
// However, we'll just a bit to be sure
time.Sleep(1 * time.Millisecond)

_, err = rm.GetRule(expiredID)
require.Error(t, err)

Expand All @@ -67,7 +71,7 @@ func TestNewRouteManager(t *testing.T) {
defer clearRules()

pk, _ := cipher.GenerateKeyPair()
rule := routing.AppRule(time.Now(), 3, pk, 3, 2, 1)
rule := routing.AppRule(10*time.Minute, 3, pk, 3, 2, 1)
_, err := rt.AddRule(rule)
require.NoError(t, err)

Expand Down Expand Up @@ -112,7 +116,7 @@ func TestNewRouteManager(t *testing.T) {
require.NoError(t, err)

// Emulate SetupNode sending AddRule request.
rule := routing.ForwardRule(time.Now(), 3, uuid.New(), id)
rule := routing.ForwardRule(10*time.Minute, 3, uuid.New(), id)
err = setup.AddRule(context.TODO(), setup.NewSetupProtocol(addIn), rule)
require.NoError(t, err)

Expand Down Expand Up @@ -150,7 +154,7 @@ func TestNewRouteManager(t *testing.T) {

proto := setup.NewSetupProtocol(in)

rule := routing.ForwardRule(time.Now(), 3, uuid.New(), 1)
rule := routing.ForwardRule(10*time.Minute, 3, uuid.New(), 1)
id, err := rt.AddRule(rule)
require.NoError(t, err)
assert.Equal(t, 1, rt.Count())
Expand Down Expand Up @@ -186,10 +190,10 @@ func TestNewRouteManager(t *testing.T) {

proto := setup.NewSetupProtocol(in)
pk, _ := cipher.GenerateKeyPair()
rule := routing.AppRule(time.Now(), 3, pk, 3, 2, 2)
rule := routing.AppRule(10*time.Minute, 3, pk, 3, 2, 2)
require.NoError(t, rt.SetRule(2, rule))

rule = routing.ForwardRule(time.Now(), 3, uuid.New(), 1)
rule = routing.ForwardRule(10*time.Minute, 3, uuid.New(), 1)
require.NoError(t, rt.SetRule(1, rule))

ld := routing.LoopData{
Expand Down Expand Up @@ -238,10 +242,10 @@ func TestNewRouteManager(t *testing.T) {
proto := setup.NewSetupProtocol(in)
pk, _ := cipher.GenerateKeyPair()

rule := routing.AppRule(time.Now(), 3, pk, 3, 2, 0)
rule := routing.AppRule(10*time.Minute, 3, pk, 3, 2, 0)
require.NoError(t, rt.SetRule(2, rule))

rule = routing.ForwardRule(time.Now(), 3, uuid.New(), 1)
rule = routing.ForwardRule(10*time.Minute, 3, uuid.New(), 1)
require.NoError(t, rt.SetRule(1, rule))

ld := routing.LoopData{
Expand Down
10 changes: 5 additions & 5 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
)

const (
// RouteTTL is the default expiration interval for routes
RouteTTL = 2 * time.Hour
// DefaultRouteKeepAlive is the default expiration interval for routes
DefaultRouteKeepAlive = 2 * time.Hour

// DefaultGarbageCollectDuration is the default duration for garbage collection of routing rules.
DefaultGarbageCollectDuration = time.Second * 5
Expand Down Expand Up @@ -298,9 +298,9 @@ func (r *Router) requestLoop(ctx context.Context, appConn *app.Protocol, raddr r
Local: laddr,
Remote: raddr,
},
Expiry: time.Now().Add(RouteTTL),
Forward: forwardRoute,
Reverse: reverseRoute,
KeepAlive: DefaultRouteKeepAlive,
Forward: forwardRoute,
Reverse: reverseRoute,
}

sConn, err := r.rm.dialSetupConn(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestRouter_Serve(t *testing.T) {
defer clearRules(r0, r1)

// Add a FWD rule for r0.
fwdRule := routing.ForwardRule(time.Now().Add(time.Hour), routing.RouteID(5), tp1.Entry.ID, routing.RouteID(0))
fwdRule := routing.ForwardRule(1*time.Hour, routing.RouteID(5), tp1.Entry.ID, routing.RouteID(0))
fwdRtID, err := r0.rm.rt.AddRule(fwdRule)
require.NoError(t, err)

Expand Down
12 changes: 6 additions & 6 deletions pkg/routing/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func (l Loop) String() string {

// LoopDescriptor defines a loop over a pair of routes.
type LoopDescriptor struct {
Loop Loop
Forward Route
Reverse Route
Expiry time.Time
Loop Loop
Forward Route
Reverse Route
KeepAlive time.Duration
}

// Initiator returns initiator of the Loop.
Expand All @@ -45,8 +45,8 @@ func (l LoopDescriptor) Responder() cipher.PubKey {
}

func (l LoopDescriptor) String() string {
return fmt.Sprintf("lport: %d. rport: %d. routes: %s/%s. expire at %s",
l.Loop.Local.Port, l.Loop.Remote.Port, l.Forward, l.Reverse, l.Expiry)
return fmt.Sprintf("lport: %d. rport: %d. routes: %s/%s. keep-alive timeout %s",
l.Loop.Local.Port, l.Loop.Remote.Port, l.Forward, l.Reverse, l.KeepAlive)
}

// LoopData stores loop confirmation request data.
Expand Down
4 changes: 2 additions & 2 deletions pkg/routing/routing_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestMain(m *testing.M) {
func RoutingTableSuite(t *testing.T, tbl Table) {
t.Helper()

rule := ForwardRule(time.Now(), 2, uuid.New(), 1)
rule := ForwardRule(15*time.Minute, 2, uuid.New(), 1)
id, err := tbl.AddRule(rule)
require.NoError(t, err)

Expand All @@ -39,7 +39,7 @@ func RoutingTableSuite(t *testing.T, tbl Table) {
require.NoError(t, err)
assert.Equal(t, rule, r)

rule2 := ForwardRule(time.Now(), 3, uuid.New(), 2)
rule2 := ForwardRule(15*time.Minute, 3, uuid.New(), 2)
id2, err := tbl.AddRule(rule2)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit 2a81b33

Please sign in to comment.