diff --git a/pkg/router/managed_routing_table.go b/pkg/router/managed_routing_table.go deleted file mode 100644 index 6c4a49792..000000000 --- a/pkg/router/managed_routing_table.go +++ /dev/null @@ -1,100 +0,0 @@ -package router - -import ( - "errors" - "sync" - "time" - - "github.com/skycoin/skywire/pkg/routing" -) - -var ( - // ErrRuleTimedOut is being returned while trying to access the rule which timed out - ErrRuleTimedOut = errors.New("rule keep-alive timeout exceeded") -) - -type managedRoutingTable struct { - routing.Table - - activity map[routing.RouteID]time.Time - mu sync.Mutex -} - -func manageRoutingTable(rt routing.Table) *managedRoutingTable { - return &managedRoutingTable{ - Table: rt, - activity: make(map[routing.RouteID]time.Time), - } -} - -func (rt *managedRoutingTable) AddRule(rule routing.Rule) (routing.RouteID, error) { - rt.mu.Lock() - defer rt.mu.Unlock() - - routeID, err := rt.Table.AddRule(rule) - if err != nil { - return 0, err - } - - // set the initial activity for rule not to be timed out instantly - rt.activity[routeID] = time.Now() - - return routeID, nil -} - -func (rt *managedRoutingTable) Rule(routeID routing.RouteID) (routing.Rule, error) { - rt.mu.Lock() - defer rt.mu.Unlock() - - rule, err := rt.Table.Rule(routeID) - if err != nil { - return nil, err - } - - if rt.ruleIsTimedOut(routeID, rule) { - return nil, ErrRuleTimedOut - } - - rt.activity[routeID] = time.Now() - - return rule, nil -} - -func (rt *managedRoutingTable) Cleanup() error { - expiredIDs := make([]routing.RouteID, 0) - rt.mu.Lock() - defer rt.mu.Unlock() - - err := rt.RangeRules(func(routeID routing.RouteID, rule routing.Rule) bool { - if rt.ruleIsTimedOut(routeID, rule) { - expiredIDs = append(expiredIDs, routeID) - } - return true - }) - if err != nil { - return err - } - - if err := rt.DeleteRules(expiredIDs...); err != nil { - return err - } - - rt.deleteActivity(expiredIDs...) - - return nil -} - -// ruleIsExpired checks whether rule's keep alive timeout is exceeded. -// NOTE: for internal use, is NOT thread-safe, object lock should be acquired outside -func (rt *managedRoutingTable) ruleIsTimedOut(routeID routing.RouteID, rule routing.Rule) bool { - lastActivity, ok := rt.activity[routeID] - return !ok || time.Since(lastActivity) > rule.KeepAlive() -} - -// deleteActivity removes activity records for the specified set of `routeIDs`. -// NOTE: for internal use, is NOT thread-safe, object lock should be acquired outside -func (rt *managedRoutingTable) deleteActivity(routeIDs ...routing.RouteID) { - for _, rID := range routeIDs { - delete(rt.activity, rID) - } -} diff --git a/pkg/router/managed_routing_table_test.go b/pkg/router/managed_routing_table_test.go deleted file mode 100644 index 58ff8398c..000000000 --- a/pkg/router/managed_routing_table_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package router - -import ( - "testing" - "time" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/skycoin/skywire/pkg/routing" -) - -func TestManagedRoutingTableCleanup(t *testing.T) { - rt := manageRoutingTable(routing.InMemoryRoutingTable()) - - _, err := rt.AddRule(routing.IntermediaryForwardRule(1*time.Hour, 1, 3, uuid.New())) - require.NoError(t, err) - - id, err := rt.AddRule(routing.IntermediaryForwardRule(1*time.Hour, 2, 3, uuid.New())) - require.NoError(t, err) - - id2, err := rt.AddRule(routing.IntermediaryForwardRule(-1*time.Hour, 3, 3, uuid.New())) - require.NoError(t, err) - - // rule should already be expired at this point due to the execution time. - // However, we'll just a bit to be sure - time.Sleep(1 * time.Millisecond) - - assert.Equal(t, 3, rt.Count()) - - _, err = rt.Rule(id) - require.NoError(t, err) - - assert.NotNil(t, rt.activity[id]) - - require.NoError(t, rt.Cleanup()) - assert.Equal(t, 2, rt.Count()) - - rule, err := rt.Rule(id2) - require.Error(t, err) - assert.Nil(t, rule) -} diff --git a/pkg/router/route_manager.go b/pkg/router/route_manager.go index 788c7ed9b..23ee78681 100644 --- a/pkg/router/route_manager.go +++ b/pkg/router/route_manager.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "net" - "time" "github.com/google/uuid" "github.com/skycoin/dmsg/cipher" @@ -19,10 +18,9 @@ import ( // RMConfig represents route manager configuration. type RMConfig struct { - SetupPKs []cipher.PubKey // Trusted setup PKs. - GarbageCollectDuration time.Duration - OnConfirmLoop func(loop routing.Loop, rule routing.Rule) (err error) - OnLoopClosed func(loop routing.Loop) error + SetupPKs []cipher.PubKey // Trusted setup PKs. + OnConfirmLoop func(loop routing.Loop, rule routing.Rule) (err error) + OnLoopClosed func(loop routing.Loop) error } // SetupIsTrusted checks if setup node is trusted. @@ -41,7 +39,7 @@ type routeManager struct { conf RMConfig n *snet.Network sl *snet.Listener // Listens for setup node requests. - rt *managedRoutingTable + rt routing.Table done chan struct{} } @@ -56,7 +54,7 @@ func newRouteManager(n *snet.Network, rt routing.Table, config RMConfig) (*route conf: config, n: n, sl: sl, - rt: manageRoutingTable(rt), + rt: rt, done: make(chan struct{}), }, nil } @@ -69,9 +67,6 @@ func (rm *routeManager) Close() error { // Serve initiates serving connections by route manager. func (rm *routeManager) Serve() { - // Routing table garbage collect loop. - go rm.rtGarbageCollectLoop() - // Accept setup node request loop. for { if err := rm.serveConn(); err != nil { @@ -139,24 +134,6 @@ func (rm *routeManager) handleSetupConn(conn net.Conn) error { return proto.WritePacket(setup.RespSuccess, respBody) } -func (rm *routeManager) rtGarbageCollectLoop() { - if rm.conf.GarbageCollectDuration <= 0 { - return - } - ticker := time.NewTicker(rm.conf.GarbageCollectDuration) - defer ticker.Stop() - for { - select { - case <-rm.done: - return - case <-ticker.C: - if err := rm.rt.Cleanup(); err != nil { - rm.Logger.WithError(err).Warnf("routing table cleanup returned error") - } - } - } -} - func (rm *routeManager) dialSetupConn(_ context.Context) (*snet.Conn, error) { for _, sPK := range rm.conf.SetupPKs { conn, err := rm.n.Dial(snet.DmsgType, sPK, snet.SetupPort) diff --git a/pkg/router/route_manager_test.go b/pkg/router/route_manager_test.go index d24af9036..d8871a421 100644 --- a/pkg/router/route_manager_test.go +++ b/pkg/router/route_manager_test.go @@ -22,7 +22,7 @@ func TestNewRouteManager(t *testing.T) { env := snettest.NewEnv(t, []snettest.KeyPair{{PK: pk, SK: sk}}) defer env.Teardown() - rt := routing.InMemoryRoutingTable() + rt := routing.New() rm, err := newRouteManager(env.Nets[0], rt, RMConfig{}) require.NoError(t, err) diff --git a/pkg/router/router.go b/pkg/router/router.go index 7be7c3fe1..a0f6cbffb 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -26,9 +26,6 @@ const ( // DefaultRouteKeepAlive is the default expiration interval for routes DefaultRouteKeepAlive = 2 * time.Hour - // DefaultGarbageCollectDuration is the default duration for garbage collection of routing rules. - DefaultGarbageCollectDuration = time.Second * 5 - minHops = 0 maxHops = 50 ) @@ -37,14 +34,13 @@ var log = logging.MustGetLogger("router") // Config configures Router. type Config struct { - Logger *logging.Logger - PubKey cipher.PubKey - SecKey cipher.SecKey - TransportManager *transport.Manager - RoutingTable routing.Table - RouteFinder routeFinder.Client - SetupNodes []cipher.PubKey - GarbageCollectDuration time.Duration + Logger *logging.Logger + PubKey cipher.PubKey + SecKey cipher.SecKey + TransportManager *transport.Manager + RoutingTable routing.Table + RouteFinder routeFinder.Client + SetupNodes []cipher.PubKey } // SetDefaults sets default values for certain empty values. @@ -52,9 +48,6 @@ func (c *Config) SetDefaults() { if c.Logger == nil { c.Logger = log } - if c.GarbageCollectDuration <= 0 { - c.GarbageCollectDuration = DefaultGarbageCollectDuration - } } // Router implements node.PacketRouter. It manages routing table by @@ -90,10 +83,9 @@ func New(n *snet.Network, config *Config) (*Router, error) { // Prepare route manager. rm, err := newRouteManager(n, config.RoutingTable, RMConfig{ - SetupPKs: config.SetupNodes, - GarbageCollectDuration: config.GarbageCollectDuration, - OnConfirmLoop: r.confirmLoop, - OnLoopClosed: r.loopClosed, + SetupPKs: config.SetupNodes, + OnConfirmLoop: r.confirmLoop, + OnLoopClosed: r.loopClosed, }) if err != nil { return nil, err diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index b79aa06e7..bf45578ff 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -207,14 +207,13 @@ func NewTestEnv(t *testing.T, nets []*snet.Network) *TestEnv { func (e *TestEnv) GenRouterConfig(i int) *Config { return &Config{ - Logger: logging.MustGetLogger(fmt.Sprintf("router_%d", i)), - PubKey: e.TpMngrConfs[i].PubKey, - SecKey: e.TpMngrConfs[i].SecKey, - TransportManager: e.TpMngrs[i], - RoutingTable: routing.InMemoryRoutingTable(), - RouteFinder: routeFinder.NewMock(), - SetupNodes: nil, // TODO - GarbageCollectDuration: DefaultGarbageCollectDuration, + Logger: logging.MustGetLogger(fmt.Sprintf("router_%d", i)), + PubKey: e.TpMngrConfs[i].PubKey, + SecKey: e.TpMngrConfs[i].SecKey, + TransportManager: e.TpMngrs[i], + RoutingTable: routing.New(), + RouteFinder: routeFinder.NewMock(), + SetupNodes: nil, // TODO } } diff --git a/pkg/routing/routing_table.go b/pkg/routing/routing_table.go index 8ba5410ac..8d280408e 100644 --- a/pkg/routing/routing_table.go +++ b/pkg/routing/routing_table.go @@ -6,8 +6,19 @@ import ( "math" "sync" "sync/atomic" + "time" + + "github.com/skycoin/skycoin/src/util/logging" ) +var log = logging.MustGetLogger("routing") + +// DefaultGCInterval is the default duration for garbage collection of routing rules. +const DefaultGCInterval = 5 * time.Second + +// ErrRuleTimedOut is being returned while trying to access the rule which timed out +var ErrRuleTimedOut = errors.New("rule keep-alive timeout exceeded") + // RangeFunc is used by RangeRules to iterate over rules. type RangeFunc func(routeID RouteID, rule Rule) (next bool) @@ -30,86 +41,172 @@ type Table interface { // Count returns the number of RoutingRule entries stored. Count() int - - // Close safely closes routing table. - Close() error } -type inMemoryRoutingTable struct { +type memTable struct { sync.RWMutex - nextID uint32 - rules map[RouteID]Rule + nextID uint32 + rules map[RouteID]Rule + activity map[RouteID]time.Time + gcInterval time.Duration +} + +// Config represents a routing table configuration. +type Config struct { + GCInterval time.Duration +} + +// DefaultConfig represents the default configuration of routing table. +func DefaultConfig() Config { + return Config{ + GCInterval: DefaultGCInterval, + } +} + +// New returns an in-memory routing table implementation. +func New() Table { + return NewWithConfig(DefaultConfig()) } -// InMemoryRoutingTable return in-memory RoutingTable implementation. -func InMemoryRoutingTable() Table { - return &inMemoryRoutingTable{ - rules: map[RouteID]Rule{}, +// NewWithConfig returns an in-memory routing table implementation with a specified configuration. +func NewWithConfig(config Config) Table { + if config.GCInterval <= 0 { + config.GCInterval = DefaultGCInterval } + + mt := &memTable{ + rules: map[RouteID]Rule{}, + activity: make(map[RouteID]time.Time), + gcInterval: config.GCInterval, + } + + go mt.gcLoop() + + return mt } -func (rt *inMemoryRoutingTable) AddRule(rule Rule) (routeID RouteID, err error) { +func (mt *memTable) AddRule(rule Rule) (routeID RouteID, err error) { if routeID == math.MaxUint32 { return 0, errors.New("no available routeIDs") } - routeID = RouteID(atomic.AddUint32(&rt.nextID, 1)) + routeID = RouteID(atomic.AddUint32(&mt.nextID, 1)) - rt.Lock() - rt.rules[routeID] = rule - rt.Unlock() + mt.Lock() + mt.rules[routeID] = rule + mt.activity[routeID] = time.Now() + mt.Unlock() return routeID, nil } -func (rt *inMemoryRoutingTable) SetRule(routeID RouteID, rule Rule) error { - rt.Lock() - rt.rules[routeID] = rule - rt.Unlock() +func (mt *memTable) SetRule(routeID RouteID, rule Rule) error { + mt.Lock() + mt.rules[routeID] = rule + mt.Unlock() return nil } -func (rt *inMemoryRoutingTable) Rule(routeID RouteID) (Rule, error) { - rt.RLock() - rule, ok := rt.rules[routeID] - rt.RUnlock() +func (mt *memTable) Rule(routeID RouteID) (Rule, error) { + mt.RLock() + rule, ok := mt.rules[routeID] + mt.RUnlock() + if !ok { return nil, fmt.Errorf("rule of id %v not found", routeID) } + + if mt.ruleIsTimedOut(routeID, rule) { + return nil, ErrRuleTimedOut + } + return rule, nil } -func (rt *inMemoryRoutingTable) RangeRules(rangeFunc RangeFunc) error { - rt.RLock() - for routeID, rule := range rt.rules { +func (mt *memTable) RangeRules(rangeFunc RangeFunc) error { + mt.RLock() + for routeID, rule := range mt.rules { if !rangeFunc(routeID, rule) { break } } - rt.RUnlock() + mt.RUnlock() return nil } -func (rt *inMemoryRoutingTable) DeleteRules(routeIDs ...RouteID) error { - rt.Lock() +func (mt *memTable) DeleteRules(routeIDs ...RouteID) error { + mt.Lock() for _, routeID := range routeIDs { - delete(rt.rules, routeID) + delete(mt.rules, routeID) } - rt.Unlock() + mt.Unlock() return nil } -func (rt *inMemoryRoutingTable) Count() int { - rt.RLock() - count := len(rt.rules) - rt.RUnlock() +func (mt *memTable) Count() int { + mt.RLock() + count := len(mt.rules) + mt.RUnlock() return count } -func (rt *inMemoryRoutingTable) Close() error { +func (mt *memTable) Close() error { return nil } + +// Routing table garbage collect loop. +func (mt *memTable) gcLoop() { + if mt.gcInterval <= 0 { + return + } + ticker := time.NewTicker(mt.gcInterval) + defer ticker.Stop() + for range ticker.C { + if err := mt.gc(); err != nil { + log.WithError(err).Warnf("routing table gc") + } + } +} + +func (mt *memTable) gc() error { + expiredIDs := make([]RouteID, 0) + + err := mt.RangeRules(func(routeID RouteID, rule Rule) bool { + if rule.Type() == RuleIntermediaryForward && mt.ruleIsTimedOut(routeID, rule) { + expiredIDs = append(expiredIDs, routeID) + } + return true + }) + if err != nil { + return err + } + + if err := mt.DeleteRules(expiredIDs...); err != nil { + return err + } + + mt.Lock() + defer mt.Unlock() + mt.deleteActivity(expiredIDs...) + + return nil +} + +// ruleIsExpired checks whether rule's keep alive timeout is exceeded. +// NOTE: for internal use, is NOT thread-safe, object lock should be acquired outside +func (mt *memTable) ruleIsTimedOut(routeID RouteID, rule Rule) bool { + lastActivity, ok := mt.activity[routeID] + return !ok || time.Since(lastActivity) > rule.KeepAlive() +} + +// deleteActivity removes activity records for the specified set of `routeIDs`. +// NOTE: for internal use, is NOT thread-safe, object lock should be acquired outside +func (mt *memTable) deleteActivity(routeIDs ...RouteID) { + for _, rID := range routeIDs { + delete(mt.activity, rID) + } +} diff --git a/pkg/routing/routing_table_test.go b/pkg/routing/routing_table_test.go index 0de4b55a0..5c3cbb45e 100644 --- a/pkg/routing/routing_table_test.go +++ b/pkg/routing/routing_table_test.go @@ -63,5 +63,40 @@ func RoutingTableSuite(t *testing.T, tbl Table) { } func TestRoutingTable(t *testing.T) { - RoutingTableSuite(t, InMemoryRoutingTable()) + RoutingTableSuite(t, New()) +} + +func TestRoutingTableCleanup(t *testing.T) { + rt := &memTable{ + rules: map[RouteID]Rule{}, + activity: make(map[RouteID]time.Time), + gcInterval: DefaultGCInterval, + } + + _, err := rt.AddRule(IntermediaryForwardRule(1*time.Hour, 1, 3, uuid.New())) + require.NoError(t, err) + + id, err := rt.AddRule(IntermediaryForwardRule(1*time.Hour, 2, 3, uuid.New())) + require.NoError(t, err) + + id2, err := rt.AddRule(IntermediaryForwardRule(-1*time.Hour, 3, 3, uuid.New())) + require.NoError(t, err) + + // rule should already be expired at this point due to the execution time. + // However, we'll just a bit to be sure + time.Sleep(1 * time.Millisecond) + + assert.Equal(t, 3, rt.Count()) + + _, err = rt.Rule(id) + require.NoError(t, err) + + assert.NotNil(t, rt.activity[id]) + + require.NoError(t, rt.gc()) + assert.Equal(t, 2, rt.Count()) + + rule, err := rt.Rule(id2) + require.Error(t, err) + assert.Nil(t, rule) } diff --git a/pkg/visor/config.go b/pkg/visor/config.go index 41290ea20..976f12996 100644 --- a/pkg/visor/config.go +++ b/pkg/visor/config.go @@ -104,7 +104,7 @@ func (c *Config) TransportLogStore() (transport.LogStore, error) { // RoutingTable returns configure routing.Table. func (c *Config) RoutingTable() (routing.Table, error) { - return routing.InMemoryRoutingTable(), nil + return routing.New(), nil } // AppsConfig decodes AppsConfig from a local json config file. diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 737d9ab5c..3169d6796 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -244,7 +244,7 @@ func NewMockRPCClient(r *rand.Rand, maxTps int, maxRules int) (cipher.PubKey, RP } log.Infof("tp[%2d]: %v", i, tps[i]) } - rt := routing.InMemoryRoutingTable() + rt := routing.New() ruleKeepAlive := router.DefaultRouteKeepAlive for i := 0; i < r.Intn(maxRules+1); i++ { remotePK, _ := cipher.GenerateKeyPair() diff --git a/pkg/visor/rpc_test.go b/pkg/visor/rpc_test.go index b04428e59..765f895f5 100644 --- a/pkg/visor/rpc_test.go +++ b/pkg/visor/rpc_test.go @@ -171,7 +171,7 @@ These tests have been commented out for the following reasons: // config: conf, // router: r, // tm: tm1, -// rt: routing.InMemoryRoutingTable(), +// rt: routing.New(), // executer: executer, // appsConf: apps, // startedApps: map[string]*appBind{},