Skip to content

Commit

Permalink
Adjust client code for keep-alive
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Aug 22, 2019
1 parent 5e7d081 commit 54b5095
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 15 deletions.
40 changes: 36 additions & 4 deletions pkg/router/managed_routing_table.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package router

import (
"errors"
"sync"
"time"

"github.com/skycoin/skywire/pkg/routing"
)

var routeKeepalive = 10 * time.Minute // interval to keep active expired routes
var (
ErrRuleTimedOut = errors.New("rule keep-alive timeout exceeded")
)

type managedRoutingTable struct {
routing.Table
Expand All @@ -23,18 +26,40 @@ func manageRoutingTable(rt routing.Table) *managedRoutingTable {
}
}

func (rt *managedRoutingTable) Rule(routeID routing.RouteID) (routing.Rule, error) {
func (rt *managedRoutingTable) AddRule(rule routing.Rule) (routing.RouteID, error) {
routeID, err := rt.Table.AddRule(rule)
if err != nil {
return 0, err
}

rt.mu.Lock()
// set the initial activity for rule not to be timed out instantly
rt.activity[routeID] = time.Now()
rt.mu.Unlock()
return rt.Table.Rule(routeID)

return routeID, nil
}

func (rt *managedRoutingTable) Rule(routeID routing.RouteID) (routing.Rule, error) {
rule, err := rt.Table.Rule(routeID)
if err != nil {
return nil, err
}

rt.mu.Lock()
defer rt.mu.Unlock()
if rt.ruleIsTimedOut(routeID, rule) {
return nil, ErrRuleTimedOut
}

return rule, nil
}

func (rt *managedRoutingTable) Cleanup() error {
expiredIDs := make([]routing.RouteID, 0)
rt.mu.Lock()
err := rt.RangeRules(func(routeID routing.RouteID, rule routing.Rule) bool {
if lastActivity, ok := rt.activity[routeID]; !ok || time.Since(lastActivity) > routeKeepalive {
if rt.ruleIsTimedOut(routeID, rule) {
expiredIDs = append(expiredIDs, routeID)
}
return true
Expand All @@ -47,3 +72,10 @@ func (rt *managedRoutingTable) Cleanup() error {

return rt.DeleteRules(expiredIDs...)
}

// ruleIsExpired checks whether rule's keep alive timeout is exceeded.
// NOTE: for internal use, is NOT thread-safe, object lock should be acquired outside
func (rt *managedRoutingTable) ruleIsTimedOut(routeID routing.RouteID, rule routing.Rule) bool {
lastActivity, ok := rt.activity[routeID]
return !ok || time.Since(lastActivity) > rule.KeepAlive()
}
12 changes: 6 additions & 6 deletions pkg/routing/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func (l Loop) String() string {

// LoopDescriptor defines a loop over a pair of routes.
type LoopDescriptor struct {
Loop Loop
Forward Route
Reverse Route
Expiry time.Time
Loop Loop
Forward Route
Reverse Route
KeepAlive time.Duration
}

// Initiator returns initiator of the Loop.
Expand All @@ -45,8 +45,8 @@ func (l LoopDescriptor) Responder() cipher.PubKey {
}

func (l LoopDescriptor) String() string {
return fmt.Sprintf("lport: %d. rport: %d. routes: %s/%s. expire at %s",
l.Loop.Local.Port, l.Loop.Remote.Port, l.Forward, l.Reverse, l.Expiry)
return fmt.Sprintf("lport: %d. rport: %d. routes: %s/%s. keep-alive timeout %s",
l.Loop.Local.Port, l.Loop.Remote.Port, l.Forward, l.Reverse, l.KeepAlive)
}

// LoopData stores loop confirmation request data.
Expand Down
11 changes: 6 additions & 5 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ func (sn *Node) serveTransport(ctx context.Context, tr *dmsg.Transport) error {

func (sn *Node) createLoop(ctx context.Context, ld routing.LoopDescriptor) error {
sn.Logger.Infof("Creating new Loop %s", ld)
rRouteID, err := sn.createRoute(ctx, ld.Expiry, ld.Reverse, ld.Loop.Local.Port, ld.Loop.Remote.Port)
rRouteID, err := sn.createRoute(ctx, ld.KeepAlive, ld.Reverse, ld.Loop.Local.Port, ld.Loop.Remote.Port)
if err != nil {
return err
}

fRouteID, err := sn.createRoute(ctx, ld.Expiry, ld.Forward, ld.Loop.Remote.Port, ld.Loop.Local.Port)
fRouteID, err := sn.createRoute(ctx, ld.KeepAlive, ld.Forward, ld.Loop.Remote.Port, ld.Loop.Local.Port)
if err != nil {
return err
}
Expand Down Expand Up @@ -220,7 +220,8 @@ func (sn *Node) createLoop(ctx context.Context, ld routing.LoopDescriptor) error
//
// During the setup process each error received along the way causes all the procedure to be canceled. RouteID received
// from the 1st step connecting to the initiating node is used as the ID for the overall rule, thus being returned.
func (sn *Node) createRoute(ctx context.Context, expireAt time.Time, route routing.Route, rport, lport routing.Port) (routing.RouteID, error) {
func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route routing.Route,
rport, lport routing.Port) (routing.RouteID, error) {
if len(route) == 0 {
return 0, nil
}
Expand Down Expand Up @@ -270,9 +271,9 @@ func (sn *Node) createRoute(ctx context.Context, expireAt time.Time, route routi
if i != len(r)-1 {
reqIDChIn = reqIDsCh[i]
nextTpID = r[i+1].Transport
rule = routing.ForwardRule(expireAt, 0, nextTpID, 0)
rule = routing.ForwardRule(keepAlive, 0, nextTpID, 0)
} else {
rule = routing.AppRule(expireAt, 0, init, lport, rport, 0)
rule = routing.AppRule(keepAlive, 0, init, lport, rport, 0)
}

go func(i int, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID,
Expand Down

0 comments on commit 54b5095

Please sign in to comment.