From 11a9b7c09b784c9ddadd91374884208296bafe19 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 10 Sep 2019 17:29:51 +0300 Subject: [PATCH] Remote `RangeRules` from `routing.Table` --- cmd/skywire-cli/commands/node/routes.go | 13 ++-- pkg/hypervisor/hypervisor.go | 8 +-- pkg/router/route_manager.go | 60 ++++++++++-------- pkg/router/route_manager_test.go | 43 ++++++------- pkg/router/router_test.go | 10 ++- pkg/routing/table.go | 83 ++++++++++--------------- pkg/routing/table_test.go | 7 +-- pkg/visor/rpc.go | 28 ++++----- pkg/visor/rpc_client.go | 34 +++++----- pkg/visor/rpc_test.go | 2 +- pkg/visor/visor.go | 2 +- 11 files changed, 134 insertions(+), 156 deletions(-) diff --git a/cmd/skywire-cli/commands/node/routes.go b/cmd/skywire-cli/commands/node/routes.go index 9002bcb76a..e9fc46b8c4 100644 --- a/cmd/skywire-cli/commands/node/routes.go +++ b/cmd/skywire-cli/commands/node/routes.go @@ -14,7 +14,6 @@ import ( "github.com/skycoin/skywire/cmd/skywire-cli/internal" "github.com/skycoin/skywire/pkg/router" "github.com/skycoin/skywire/pkg/routing" - "github.com/skycoin/skywire/pkg/visor" ) func init() { @@ -48,7 +47,7 @@ var ruleCmd = &cobra.Command{ rule, err := rpcClient().RoutingRule(routing.RouteID(id)) internal.Catch(err) - printRoutingRules(&visor.RoutingEntry{Key: rule.KeyRouteID(), Value: rule}) + printRoutingRules(routing.RuleEntry{RouteID: rule.KeyRouteID(), Rule: rule}) }, } @@ -114,7 +113,7 @@ var addRuleCmd = &cobra.Command{ }, } -func printRoutingRules(rules ...*visor.RoutingEntry) { +func printRoutingRules(entries ...routing.RuleEntry) { printConsumeRule := func(w io.Writer, id routing.RouteID, s *routing.RuleSummary) { _, err := fmt.Fprintf(w, "%d\t%s\t%d\t%d\t%s\t%s\t%s\t%s\t%s\n", id, s.Type, s.ConsumeFields.RouteDescriptor.SrcPort, s.ConsumeFields.RouteDescriptor.DstPort, @@ -129,11 +128,11 @@ func printRoutingRules(rules ...*visor.RoutingEntry) { w := tabwriter.NewWriter(os.Stdout, 0, 0, 5, ' ', tabwriter.TabIndent) _, err := fmt.Fprintln(w, "id\ttype\tlocal-port\tremote-port\tremote-pk\tresp-id\tnext-route-id\tnext-transport-id\texpire-at") internal.Catch(err) - for _, rule := range rules { - if rule.Value.Summary().ConsumeFields != nil { - printConsumeRule(w, rule.Key, rule.Value.Summary()) + for _, entry := range entries { + if entry.Rule.Summary().ConsumeFields != nil { + printConsumeRule(w, entry.RouteID, entry.Rule.Summary()) } else { - printFwdRule(w, rule.Key, rule.Value.Summary()) + printFwdRule(w, entry.RouteID, entry.Rule.Summary()) } } internal.Catch(w.Flush()) diff --git a/pkg/hypervisor/hypervisor.go b/pkg/hypervisor/hypervisor.go index 4a3d91fce1..ca5ab0f981 100644 --- a/pkg/hypervisor/hypervisor.go +++ b/pkg/hypervisor/hypervisor.go @@ -459,14 +459,14 @@ func (m *Node) getRoutes() http.HandlerFunc { httputil.WriteJSON(w, r, http.StatusBadRequest, err) return } - rules, err := ctx.RPC.RoutingRules() + entries, err := ctx.RPC.RoutingRules() if err != nil { httputil.WriteJSON(w, r, http.StatusInternalServerError, err) return } - resp := make([]routingRuleResp, len(rules)) - for i, rule := range rules { - resp[i] = makeRoutingRuleResp(rule.Key, rule.Value, qSummary) + resp := make([]routingRuleResp, len(entries)) + for i, entry := range entries { + resp[i] = makeRoutingRuleResp(entry.RouteID, entry.Rule, qSummary) } httputil.WriteJSON(w, r, http.StatusOK, resp) }) diff --git a/pkg/router/route_manager.go b/pkg/router/route_manager.go index b281df931f..b4a716be36 100644 --- a/pkg/router/route_manager.go +++ b/pkg/router/route_manager.go @@ -167,24 +167,21 @@ func (rm *routeManager) GetRule(routeID routing.RouteID) (routing.Rule, error) { // RemoveLoopRule removes loop rule. func (rm *routeManager) RemoveLoopRule(loop routing.Loop) { - var appRouteID routing.RouteID - var consumeRule routing.Rule - rm.rt.RangeRules(func(routeID routing.RouteID, rule routing.Rule) bool { - if rule.Type() != routing.RuleConsume || rule.RouteDescriptor().DstPK() != loop.Remote.PubKey || - rule.RouteDescriptor().DstPort() != loop.Remote.Port || - rule.RouteDescriptor().SrcPort() != loop.Local.Port { - return true - } - - appRouteID = routeID - consumeRule = make(routing.Rule, len(rule)) - copy(consumeRule, rule) + remote := loop.Remote + local := loop.Local - return false - }) + entries := rm.rt.AllRules() + for _, entry := range entries { + rule := entry.Rule + if rule.Type() != routing.RuleConsume { + continue + } - if len(consumeRule) != 0 { - rm.rt.DelRules([]routing.RouteID{appRouteID}) + rd := rule.RouteDescriptor() + if rd.DstPK() == remote.PubKey && rd.DstPort() == remote.Port && rd.SrcPort() == local.Port { + rm.rt.DelRules([]routing.RouteID{entry.RouteID}) + return + } } } @@ -224,20 +221,29 @@ func (rm *routeManager) confirmLoop(data []byte) error { return err } + remote := ld.Loop.Remote + local := ld.Loop.Local + var appRouteID routing.RouteID var consumeRule routing.Rule - rm.rt.RangeRules(func(routeID routing.RouteID, rule routing.Rule) bool { - if rule.Type() != routing.RuleConsume || rule.RouteDescriptor().DstPK() != ld.Loop.Remote.PubKey || - rule.RouteDescriptor().DstPort() != ld.Loop.Remote.Port || - rule.RouteDescriptor().SrcPort() != ld.Loop.Local.Port { - return true + + entries := rm.rt.AllRules() + for _, entry := range entries { + rule := entry.Rule + if rule.Type() != routing.RuleConsume { + continue } - appRouteID = routeID - consumeRule = make(routing.Rule, len(rule)) - copy(consumeRule, rule) - return false - }) + rd := rule.RouteDescriptor() + if rd.DstPK() == remote.PubKey && rd.DstPort() == remote.Port && rd.SrcPort() == local.Port { + + appRouteID = entry.RouteID + consumeRule = make(routing.Rule, len(rule)) + copy(consumeRule, rule) + + break + } + } if consumeRule == nil { return errors.New("unknown loop") @@ -262,7 +268,7 @@ func (rm *routeManager) confirmLoop(data []byte) error { return fmt.Errorf("routing table: %s", rErr) } - rm.Logger.Infof("Confirmed loop with %s:%d", ld.Loop.Remote.PubKey, ld.Loop.Remote.Port) + rm.Logger.Infof("Confirmed loop with %s:%d", remote.PubKey, remote.Port) return nil } diff --git a/pkg/router/route_manager_test.go b/pkg/router/route_manager_test.go index 57e55e3bb1..d9e6fb70d6 100644 --- a/pkg/router/route_manager_test.go +++ b/pkg/router/route_manager_test.go @@ -22,7 +22,7 @@ func TestNewRouteManager(t *testing.T) { env := snettest.NewEnv(t, []snettest.KeyPair{{PK: pk, SK: sk}}) defer env.Teardown() - rt := routing.New() + rt := routing.NewWithConfig(routing.Config{GCInterval: 100 * time.Millisecond}) rm, err := newRouteManager(env.Nets[0], rt, RMConfig{}) require.NoError(t, err) @@ -30,17 +30,15 @@ func TestNewRouteManager(t *testing.T) { // CLOSURE: Delete all routing rules. clearRules := func() { - var rules []routing.RouteID - rt.RangeRules(func(routeID routing.RouteID, _ routing.Rule) (next bool) { - rules = append(rules[0:], routeID) - return true - }) - rt.DelRules(rules) + entries := rt.AllRules() + for _, entry := range entries { + rt.DelRules([]routing.RouteID{entry.RouteID}) + } } // TEST: Set and get expired and unexpired rule. t.Run("GetRule", func(t *testing.T) { - defer clearRules() + clearRules() expiredRule := routing.IntermediaryForwardRule(-10*time.Minute, 1, 3, uuid.New()) expiredID, err := rm.rt.ReserveKey() @@ -54,6 +52,8 @@ func TestNewRouteManager(t *testing.T) { err = rm.rt.SaveRule(id, rule) require.NoError(t, err) + defer rm.rt.DelRules([]routing.RouteID{id, expiredID}) + // rule should already be expired at this point due to the execution time. // However, we'll just a bit to be sure time.Sleep(1 * time.Millisecond) @@ -71,7 +71,7 @@ func TestNewRouteManager(t *testing.T) { // TEST: Ensure removing loop rules work properly. t.Run("RemoveLoopRule", func(t *testing.T) { - defer clearRules() + clearRules() pk, _ := cipher.GenerateKeyPair() rule := routing.ConsumeRule(10*time.Minute, 1, pk, 2, 3) @@ -92,7 +92,7 @@ func TestNewRouteManager(t *testing.T) { // TEST: Ensure AddRule and DeleteRule requests from a SetupNode does as expected. t.Run("AddRemoveRule", func(t *testing.T) { - defer clearRules() + clearRules() // Add/Remove rules multiple times. for i := 0; i < 5; i++ { @@ -109,16 +109,6 @@ func TestNewRouteManager(t *testing.T) { close(errCh) }() - // TODO: remove defer from for loop - defer func() { - require.NoError(t, requestIDIn.Close()) - require.NoError(t, addIn.Close()) - require.NoError(t, delIn.Close()) - for err := range errCh { - require.NoError(t, err) - } - }() - // Emulate SetupNode sending RequestRegistrationID request. ids, err := setup.RequestRouteIDs(context.TODO(), setup.NewSetupProtocol(requestIDIn), 1) require.NoError(t, err) @@ -142,12 +132,19 @@ func TestNewRouteManager(t *testing.T) { r, err = rt.Rule(ids[0]) assert.Error(t, err) assert.Nil(t, r) + + require.NoError(t, requestIDIn.Close()) + require.NoError(t, addIn.Close()) + require.NoError(t, delIn.Close()) + for err := range errCh { + require.NoError(t, err) + } } }) // TEST: Ensure DeleteRule requests from SetupNode is handled properly. t.Run("DelRules", func(t *testing.T) { - defer clearRules() + clearRules() in, out := net.Pipe() errCh := make(chan error, 1) @@ -177,7 +174,7 @@ func TestNewRouteManager(t *testing.T) { // TEST: Ensure ConfirmLoop request from SetupNode is handled properly. t.Run("ConfirmLoop", func(t *testing.T) { - defer clearRules() + clearRules() var inLoop routing.Loop var inRule routing.Rule @@ -230,7 +227,7 @@ func TestNewRouteManager(t *testing.T) { // TEST: Ensure LoopClosed request from SetupNode is handled properly. t.Run("LoopClosed", func(t *testing.T) { - defer clearRules() + clearRules() var inLoop routing.Loop diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index 011dbed807..f0f127c3db 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -61,12 +61,10 @@ func TestRouter_Serve(t *testing.T) { // CLOSURE: clear all rules in all router. clearRules := func(routers ...*Router) { for _, r := range routers { - var rtIDs []routing.RouteID - r.rm.rt.RangeRules(func(rtID routing.RouteID, _ routing.Rule) bool { - rtIDs = append(rtIDs, rtID) - return true - }) - r.rm.rt.DelRules(rtIDs) + entries := r.rm.rt.AllRules() + for _, entry := range entries { + r.rm.rt.DelRules([]routing.RouteID{entry.RouteID}) + } } } diff --git a/pkg/routing/table.go b/pkg/routing/table.go index 96e4a992ca..a1dca807c7 100644 --- a/pkg/routing/table.go +++ b/pkg/routing/table.go @@ -18,9 +18,6 @@ var ( ErrNoAvailableRoutes = errors.New("no available routeIDs") ) -// RangeFunc is used by RangeRules to iterate over rules. -type RangeFunc func(routeID RouteID, rule Rule) (next bool) - // Table represents a routing table implementation. type Table interface { // ReserveKey reserves a RouteID. @@ -36,14 +33,11 @@ type Table interface { RulesWithDesc(RouteDescriptor) []Rule // AllRules returns all non timed out rules. - AllRules() []Rule + AllRules() []RuleEntry // DelRules removes RoutingRules with a given a RouteIDs. DelRules([]RouteID) - // RangeRules iterates over all rules and yields values to the rangeFunc until `next` is false. - RangeRules(RangeFunc) - // Count returns the number of RoutingRule entries stored. Count() int } @@ -143,49 +137,48 @@ func (mt *memTable) RulesWithDesc(desc RouteDescriptor) []Rule { return rules } -func (mt *memTable) AllRules() []Rule { +// RuleEntry is a pair of a RouteID and a Rule. +type RuleEntry struct { + RouteID RouteID + Rule Rule +} + +func (mt *memTable) AllRules() []RuleEntry { mt.RLock() defer mt.RUnlock() - rules := make([]Rule, 0, len(mt.rules)) + rules := make([]RuleEntry, 0, len(mt.rules)) for k, v := range mt.rules { if !mt.ruleIsTimedOut(k, v) { - rules = append(rules, v) + entry := RuleEntry{ + RouteID: k, + Rule: v, + } + rules = append(rules, entry) } } return rules } -func (mt *memTable) RangeRules(rangeFunc RangeFunc) { - mt.RLock() - defer mt.RUnlock() - - for routeID, rule := range mt.rules { - if !rangeFunc(routeID, rule) { - break - } - } -} - func (mt *memTable) DelRules(routeIDs []RouteID) { - mt.Lock() - defer mt.Unlock() - for _, routeID := range routeIDs { - delete(mt.rules, routeID) + mt.Lock() + mt.delRule(routeID) + mt.Unlock() } } +func (mt *memTable) delRule(routeID RouteID) { + delete(mt.rules, routeID) + delete(mt.activity, routeID) +} + func (mt *memTable) Count() int { mt.RLock() - count := len(mt.rules) - mt.RUnlock() - return count -} + defer mt.RUnlock() -func (mt *memTable) Close() error { - return nil + return len(mt.rules) } // Routing table garbage collect loop. @@ -199,33 +192,21 @@ func (mt *memTable) gcLoop() { } func (mt *memTable) gc() { - expiredIDs := make([]RouteID, 0) + mt.Lock() + defer mt.Unlock() - mt.RangeRules(func(routeID RouteID, rule Rule) bool { + for routeID, rule := range mt.rules { if rule.Type() == RuleIntermediaryForward && mt.ruleIsTimedOut(routeID, rule) { - expiredIDs = append(expiredIDs, routeID) + mt.delRule(routeID) } - return true - }) - - mt.DelRules(expiredIDs) - - mt.Lock() - defer mt.Unlock() - mt.deleteActivity(expiredIDs...) + } } // ruleIsExpired checks whether rule's keep alive timeout is exceeded. // NOTE: for internal use, is NOT thread-safe, object lock should be acquired outside func (mt *memTable) ruleIsTimedOut(routeID RouteID, rule Rule) bool { lastActivity, ok := mt.activity[routeID] - return !ok || time.Since(lastActivity) > rule.KeepAlive() -} - -// deleteActivity removes activity records for the specified set of `routeIDs`. -// NOTE: for internal use, is NOT thread-safe, object lock should be acquired outside -func (mt *memTable) deleteActivity(routeIDs ...RouteID) { - for _, rID := range routeIDs { - delete(mt.activity, rID) - } + idling := time.Since(lastActivity) + keepAlive := rule.KeepAlive() + return !ok || idling > keepAlive } diff --git a/pkg/routing/table_test.go b/pkg/routing/table_test.go index fa6eeda2e7..b9b8ba1657 100644 --- a/pkg/routing/table_test.go +++ b/pkg/routing/table_test.go @@ -58,10 +58,9 @@ func RoutingTableSuite(t *testing.T, tbl Table) { assert.Equal(t, rule, r) ids := make([]RouteID, 0) - tbl.RangeRules(func(routeID RouteID, _ Rule) bool { - ids = append(ids, routeID) - return true - }) + for _, entry := range tbl.AllRules() { + ids = append(ids, entry.RouteID) + } require.ElementsMatch(t, []RouteID{id, id2}, ids) tbl.DelRules([]RouteID{id, id2}) diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index 9c7a086b72..d0f9718d26 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -306,12 +306,8 @@ type RoutingEntry struct { } // RoutingRules obtains all routing rules of the RoutingTable. -func (r *RPC) RoutingRules(_ *struct{}, out *[]*RoutingEntry) error { - r.node.rt.RangeRules(func(routeID routing.RouteID, rule routing.Rule) (next bool) { - *out = append(*out, &RoutingEntry{Key: routeID, Value: rule}) - return true - }) - +func (r *RPC) RoutingRules(_ *struct{}, out *[]routing.RuleEntry) error { + *out = r.node.rt.AllRules() return nil } @@ -358,20 +354,24 @@ type LoopInfo struct { // Loops retrieves loops via rules of the routing table. func (r *RPC) Loops(_ *struct{}, out *[]LoopInfo) error { var loops []LoopInfo - r.node.rt.RangeRules(func(_ routing.RouteID, rule routing.Rule) (next bool) { - if rule.Type() == routing.RuleConsume { - loops = append(loops, LoopInfo{ConsumeRule: rule}) + + entries := r.node.rt.AllRules() + for _, entry := range entries { + if entry.Rule.Type() != routing.RuleConsume { + continue } - return true - }) - for i, l := range loops { - fwdRID := l.ConsumeRule.NextRouteID() + + fwdRID := entry.Rule.NextRouteID() rule, err := r.node.rt.Rule(fwdRID) if err != nil { return err } - loops[i].FwdRule = rule + loops = append(loops, LoopInfo{ + ConsumeRule: rule, + FwdRule: rule, + }) } + *out = loops return nil } diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index fbd0979bfd..1c89b7e53a 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -39,7 +39,7 @@ type RPCClient interface { AddTransport(remote cipher.PubKey, tpType string, public bool, timeout time.Duration) (*TransportSummary, error) RemoveTransport(tid uuid.UUID) error - RoutingRules() ([]*RoutingEntry, error) + RoutingRules() ([]routing.RuleEntry, error) RoutingRule(key routing.RouteID) (routing.Rule, error) AddRoutingRule(rule routing.Rule) (routing.RouteID, error) SetRoutingRule(key routing.RouteID, rule routing.Rule) error @@ -176,8 +176,8 @@ func (rc *rpcClient) RemoveTransport(tid uuid.UUID) error { } // RoutingRules calls RoutingRules. -func (rc *rpcClient) RoutingRules() ([]*RoutingEntry, error) { - var entries []*RoutingEntry +func (rc *rpcClient) RoutingRules() ([]routing.RuleEntry, error) { + var entries []routing.RuleEntry err := rc.Call("RoutingRules", &struct{}{}, &entries) return entries, err } @@ -470,13 +470,8 @@ func (mc *mockRPCClient) RemoveTransport(tid uuid.UUID) error { } // RoutingRules implements RPCClient. -func (mc *mockRPCClient) RoutingRules() ([]*RoutingEntry, error) { - var entries []*RoutingEntry - mc.rt.RangeRules(func(routeID routing.RouteID, rule routing.Rule) (next bool) { - entries = append(entries, &RoutingEntry{Key: routeID, Value: rule}) - return true - }) - return entries, nil +func (mc *mockRPCClient) RoutingRules() ([]routing.RuleEntry, error) { + return mc.rt.AllRules(), nil } // RoutingRule implements RPCClient. @@ -508,19 +503,22 @@ func (mc *mockRPCClient) RemoveRoutingRule(key routing.RouteID) error { // Loops implements RPCClient. func (mc *mockRPCClient) Loops() ([]LoopInfo, error) { var loops []LoopInfo - mc.rt.RangeRules(func(_ routing.RouteID, rule routing.Rule) (next bool) { - if rule.Type() == routing.RuleConsume { - loops = append(loops, LoopInfo{ConsumeRule: rule}) + entries := mc.rt.AllRules() + for _, entry := range entries { + if entry.Rule.Type() != routing.RuleConsume { + continue } - return true - }) - for i, l := range loops { - fwdRID := l.ConsumeRule.NextRouteID() + + fwdRID := entry.Rule.NextRouteID() rule, err := mc.rt.Rule(fwdRID) if err != nil { return nil, err } - loops[i].FwdRule = rule + loops = append(loops, LoopInfo{ + ConsumeRule: entry.Rule, + FwdRule: rule, + }) } + return loops, nil } diff --git a/pkg/visor/rpc_test.go b/pkg/visor/rpc_test.go index 765f895f56..c08d716107 100644 --- a/pkg/visor/rpc_test.go +++ b/pkg/visor/rpc_test.go @@ -318,7 +318,7 @@ These tests have been commented out for the following reasons: // t.Run("Transport", func(t *testing.T) { // var ids []uuid.UUID // node.tm.WalkTransports(func(tp *transport.ManagedTransport) bool { -// ids = append(ids, tp.Entry.ID) +// ids = append(ids, tp.RuleEntry.ID) // return true // }) // diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index 375d1ee677..a3b1f08d09 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -428,7 +428,7 @@ func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err erro node.startedApps[config.App] = bind node.startedMu.Unlock() - // TODO: make PackageLogger return *Entry. FieldLogger doesn't expose Writer. + // TODO: make PackageLogger return *RuleEntry. FieldLogger doesn't expose Writer. logger := node.logger.WithField("_module", fmt.Sprintf("%s.v%s", config.App, config.Version)).Writer() defer func() { if logErr := logger.Close(); err == nil && logErr != nil {