From 59aefb53f60e38a862eb97c24a26249361d459cf Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 8 Oct 2019 20:17:16 +0300 Subject: [PATCH] Minor fixes --- pkg/router/router.go | 69 ++++++++++++++++-------- pkg/router/router_test.go | 30 +++++------ pkg/router/routerclient/client.go | 1 + pkg/router/rpc_gateway.go | 8 +-- pkg/routing/table.go | 31 ++++++++--- pkg/routing/table_test.go | 16 +++--- pkg/setup/node.go | 85 +++++++++++++++++++++++++++++ pkg/setup/rpc_gateway.go | 89 +++---------------------------- pkg/setup/setupclient/client.go | 20 +++---- pkg/setup/setupclient/wrappers.go | 4 +- pkg/visor/rpc_client.go | 10 ++-- pkg/visor/visor_test.go | 2 +- 12 files changed, 210 insertions(+), 155 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index 49f3b5774c..2666e039a9 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -56,11 +56,13 @@ type DialOptions struct { MaxConsumeRts int } -var DefaultDialOptions = &DialOptions{ - MinForwardRts: 1, - MaxForwardRts: 1, - MinConsumeRts: 1, - MaxConsumeRts: 1, +func DefaultDialOptions() DialOptions { + return DialOptions{ + MinForwardRts: 1, + MaxForwardRts: 1, + MinConsumeRts: 1, + MaxConsumeRts: 1, + } } type Router interface { @@ -80,7 +82,7 @@ type Router interface { // Then the following should happen: // - Save to routing.Table and internal RouteGroup map. // - Return the RoutingGroup. - AcceptRoutes() (*RouteGroup, error) + AcceptRoutes(context.Context) (*RouteGroup, error) Serve(context.Context) error @@ -92,18 +94,20 @@ type Router interface { // rules and manages loops for apps. type router struct { mx sync.Mutex + once sync.Once + done chan struct{} wg sync.WaitGroup conf *Config logger *logging.Logger n *snet.Network sl *snet.Listener - accept chan routing.EdgeRules trustedNodes map[cipher.PubKey]struct{} tm *transport.Manager rt routing.Table rfc rfclient.Client // route finder client rgs map[routing.RouteDescriptor]*RouteGroup // route groups to push incoming reads from transports. rpcSrv *rpc.Server + accept chan routing.EdgeRules } // New constructs a new Router. @@ -183,8 +187,14 @@ func (r *router) DialRoutes(ctx context.Context, rPK cipher.PubKey, lPort, rPort // Then the following should happen: // - Save to routing.Table and internal RouteGroup map. // - Return the RoutingGroup. -func (r *router) AcceptRoutes() (*RouteGroup, error) { - rules := <-r.accept +func (r *router) AcceptRoutes(ctx context.Context) (*RouteGroup, error) { + var rules routing.EdgeRules + select { + case <-ctx.Done(): + return nil, ctx.Err() + case rules = <-r.accept: + break + } if err := r.saveRoutingRules(rules.Forward, rules.Reverse); err != nil { return nil, err @@ -332,8 +342,17 @@ func (r *router) Close() error { if r == nil { return nil } + r.logger.Info("Closing all App connections and Loops") + r.once.Do(func() { + close(r.done) + + r.mx.Lock() + close(r.accept) + r.mx.Unlock() + }) + if err := r.sl.Close(); err != nil { r.logger.WithError(err).Warnf("closing route_manager returned error") } @@ -374,7 +393,8 @@ func (r *router) RemoveRouteDescriptor(desc routing.RouteDescriptor) { func (r *router) fetchBestRoutes(source, destination cipher.PubKey, opts *DialOptions) (fwd routing.Path, rev routing.Path, err error) { // TODO(nkryuchkov): use opts if opts == nil { - opts = DefaultDialOptions + defaultOpts := DefaultDialOptions() + opts = &defaultOpts } r.logger.Infof("Requesting new routes from %s to %s", source, destination) @@ -420,18 +440,6 @@ func (r *router) saveRoutingRules(rules ...routing.Rule) error { return nil } -func (r *router) occupyRouteID(n uint8) ([]routing.RouteID, error) { - var ids = make([]routing.RouteID, n) - for i := range ids { - routeID, err := r.rt.ReserveKey() - if err != nil { - return nil, err - } - ids[i] = routeID - } - return ids, nil -} - func (r *router) routeGroup(desc routing.RouteDescriptor) (*RouteGroup, bool) { r.mx.Lock() defer r.mx.Unlock() @@ -439,3 +447,20 @@ func (r *router) routeGroup(desc routing.RouteDescriptor) (*RouteGroup, bool) { rg, ok := r.rgs[desc] return rg, ok } + +func (r *router) IntroduceRules(rules routing.EdgeRules) error { + select { + case <-r.done: + return io.ErrClosedPipe + default: + r.mx.Lock() + defer r.mx.Unlock() + + select { + case r.accept <- rules: + return nil + case <-r.done: + return io.ErrClosedPipe + } + } +} diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index ebce2393d5..ef871d3f6e 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -79,15 +79,15 @@ func TestRouter_Serve(t *testing.T) { defer clearRules(r0, r1) // Add a FWD rule for r0. - fwdRtID, err := r0.rt.ReserveKey() + fwdRtID, err := r0.rt.ReserveKeys(1) require.NoError(t, err) - fwdRule := routing.IntermediaryForwardRule(1*time.Hour, fwdRtID, routing.RouteID(5), tp1.Entry.ID) + fwdRule := routing.IntermediaryForwardRule(1*time.Hour, fwdRtID[0], routing.RouteID(5), tp1.Entry.ID) err = r0.rt.SaveRule(fwdRule) require.NoError(t, err) // Call handleTransportPacket for r0 (this should in turn, use the rule we added). - packet := routing.MakeDataPacket(fwdRtID, []byte("This is a test!")) + packet := routing.MakeDataPacket(fwdRtID[0], []byte("This is a test!")) require.NoError(t, r0.handleTransportPacket(context.TODO(), packet)) // r1 should receive the packet handled by r0. @@ -199,33 +199,33 @@ func TestRouter_Rules(t *testing.T) { t.Run("GetRule", func(t *testing.T) { clearRules() - expiredID, err := r.rt.ReserveKey() + expiredID, err := r.rt.ReserveKeys(1) require.NoError(t, err) - expiredRule := routing.IntermediaryForwardRule(-10*time.Minute, expiredID, 3, uuid.New()) + expiredRule := routing.IntermediaryForwardRule(-10*time.Minute, expiredID[0], 3, uuid.New()) err = r.rt.SaveRule(expiredRule) require.NoError(t, err) - id, err := r.rt.ReserveKey() + id, err := r.rt.ReserveKeys(1) require.NoError(t, err) - rule := routing.IntermediaryForwardRule(10*time.Minute, id, 3, uuid.New()) + rule := routing.IntermediaryForwardRule(10*time.Minute, id[0], 3, uuid.New()) err = r.rt.SaveRule(rule) require.NoError(t, err) - defer r.rt.DelRules([]routing.RouteID{id, expiredID}) + defer r.rt.DelRules([]routing.RouteID{id[0], expiredID[0]}) // rule should already be expired at this point due to the execution time. // However, we'll just a bit to be sure time.Sleep(1 * time.Millisecond) - _, err = r.GetRule(expiredID) + _, err = r.GetRule(expiredID[0]) require.Error(t, err) _, err = r.GetRule(123) require.Error(t, err) - r, err := r.GetRule(id) + r, err := r.GetRule(id[0]) require.NoError(t, err) assert.Equal(t, rule, r) }) @@ -236,10 +236,10 @@ func TestRouter_Rules(t *testing.T) { pk, _ := cipher.GenerateKeyPair() - id, err := r.rt.ReserveKey() + id, err := r.rt.ReserveKeys(1) require.NoError(t, err) - rule := routing.ConsumeRule(10*time.Minute, id, pk, 2, 3) + rule := routing.ConsumeRule(10*time.Minute, id[0], pk, 2, 3) err = r.rt.SaveRule(rule) require.NoError(t, err) @@ -323,17 +323,17 @@ func TestRouter_Rules(t *testing.T) { proto := setup.NewSetupProtocol(in) - id, err := r.rt.ReserveKey() + id, err := r.rt.ReserveKeys(1) require.NoError(t, err) - rule := routing.IntermediaryForwardRule(10*time.Minute, id, 3, uuid.New()) + rule := routing.IntermediaryForwardRule(10*time.Minute, id[0], 3, uuid.New()) err = r.rt.SaveRule(rule) require.NoError(t, err) assert.Equal(t, 1, rt.Count()) - require.NoError(t, setup.DeleteRule(context.TODO(), proto, id)) + require.NoError(t, setup.DeleteRule(context.TODO(), proto, id[0])) assert.Equal(t, 0, rt.Count()) }) diff --git a/pkg/router/routerclient/client.go b/pkg/router/routerclient/client.go index a161218de2..259e872d33 100644 --- a/pkg/router/routerclient/client.go +++ b/pkg/router/routerclient/client.go @@ -47,6 +47,7 @@ func (c *Client) Close() error { return nil } +// TODO: make sure that deadline functions are used, then get rid of context here and below func (c *Client) AddEdgeRules(ctx context.Context, rules routing.EdgeRules) (bool, error) { var ok bool err := c.call(ctx, rpcName+".AddEdgeRules", rules, &ok) diff --git a/pkg/router/rpc_gateway.go b/pkg/router/rpc_gateway.go index 7d63adee77..a7e67f7dab 100644 --- a/pkg/router/rpc_gateway.go +++ b/pkg/router/rpc_gateway.go @@ -20,9 +20,9 @@ func NewRPCGateway(router *router) *RPCGateway { } func (r *RPCGateway) AddEdgeRules(rules routing.EdgeRules, ok *bool) error { - go func() { - r.router.accept <- rules - }() + if err := r.router.IntroduceRules(rules); err != nil { + return err + } if err := r.router.saveRoutingRules(rules.Forward, rules.Reverse); err != nil { *ok = false @@ -46,7 +46,7 @@ func (r *RPCGateway) AddIntermediaryRules(rules []routing.Rule, ok *bool) error } func (r *RPCGateway) ReserveIDs(n uint8, routeIDs *[]routing.RouteID) error { - ids, err := r.router.occupyRouteID(n) + ids, err := r.router.rt.ReserveKeys(int(n)) if err != nil { r.logger.WithError(err).Warnf("Request completed with error.") return setup.Failure{Code: setup.FailureReserveRtIDs, Msg: err.Error()} diff --git a/pkg/routing/table.go b/pkg/routing/table.go index 433d92c858..c3137bda5f 100644 --- a/pkg/routing/table.go +++ b/pkg/routing/table.go @@ -20,8 +20,8 @@ var ( // Table represents a routing table implementation. type Table interface { - // ReserveKey reserves a RouteID. - ReserveKey() (RouteID, error) + // ReserveKeys reserves n RouteIDs. + ReserveKeys(n int) ([]RouteID, error) // SaveRule sets RoutingRule for a given RouteID. SaveRule(Rule) error @@ -80,16 +80,33 @@ func NewTable(config Config) Table { return mt } -func (mt *memTable) ReserveKey() (key RouteID, err error) { +func (mt *memTable) ReserveKeys(n int) ([]RouteID, error) { + first, last, err := mt.reserveKeysImpl(n) + if err != nil { + return nil, err + } + + routes := make([]RouteID, 0, n) + for id := first; id <= last; id++ { + routes = append(routes, id) + } + + return routes, nil +} + +func (mt *memTable) reserveKeysImpl(n int) (first, last RouteID, err error) { mt.Lock() defer mt.Unlock() - if mt.nextID == math.MaxUint32 { - return 0, ErrNoAvailableRoutes + if int64(mt.nextID)+int64(n) >= math.MaxUint32 { + return 0, 0, ErrNoAvailableRoutes } - mt.nextID++ - return mt.nextID, nil + first = mt.nextID + 1 + mt.nextID += RouteID(n) + last = mt.nextID + + return first, last, nil } func (mt *memTable) SaveRule(rule Rule) error { diff --git a/pkg/routing/table_test.go b/pkg/routing/table_test.go index 3e67fad381..a2f18eb1c5 100644 --- a/pkg/routing/table_test.go +++ b/pkg/routing/table_test.go @@ -30,30 +30,30 @@ func TestMain(m *testing.M) { func RoutingTableSuite(t *testing.T, tbl Table) { t.Helper() - id, err := tbl.ReserveKey() + id, err := tbl.ReserveKeys(1) require.NoError(t, err) - rule := IntermediaryForwardRule(15*time.Minute, id, 2, uuid.New()) + rule := IntermediaryForwardRule(15*time.Minute, id[0], 2, uuid.New()) err = tbl.SaveRule(rule) require.NoError(t, err) assert.Equal(t, 1, tbl.Count()) - r, err := tbl.Rule(id) + r, err := tbl.Rule(id[0]) require.NoError(t, err) assert.Equal(t, rule, r) - id2, err := tbl.ReserveKey() + id2, err := tbl.ReserveKeys(1) require.NoError(t, err) - rule2 := IntermediaryForwardRule(15*time.Minute, id2, 3, uuid.New()) + rule2 := IntermediaryForwardRule(15*time.Minute, id2[0], 3, uuid.New()) err = tbl.SaveRule(rule2) require.NoError(t, err) assert.Equal(t, 2, tbl.Count()) require.NoError(t, tbl.SaveRule(rule)) - r, err = tbl.Rule(id) + r, err = tbl.Rule(id[0]) require.NoError(t, err) assert.Equal(t, rule, r) @@ -61,9 +61,9 @@ func RoutingTableSuite(t *testing.T, tbl Table) { for _, rule := range tbl.AllRules() { ids = append(ids, rule.KeyRouteID()) } - require.ElementsMatch(t, []RouteID{id, id2}, ids) + require.ElementsMatch(t, []RouteID{id[0], id2[0]}, ids) - tbl.DelRules([]RouteID{id, id2}) + tbl.DelRules([]RouteID{id[0], id2[0]}) assert.Equal(t, 0, tbl.Count()) } diff --git a/pkg/setup/node.go b/pkg/setup/node.go index 612d042f8e..979c67883c 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -4,12 +4,15 @@ import ( "context" "fmt" "net/rpc" + "sync" "github.com/skycoin/dmsg" "github.com/skycoin/dmsg/disc" "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/pkg/metrics" + "github.com/skycoin/skywire/pkg/router/routerclient" + "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/snet" ) @@ -89,3 +92,85 @@ func (sn *Node) Serve() error { go rpcS.ServeConn(conn) } } + +func (sn *Node) handleDialRouteGroup(ctx context.Context, route routing.BidirectionalRoute) (routing.EdgeRules, error) { + idr, err := sn.reserveRouteIDs(ctx, route) + if err != nil { + return routing.EdgeRules{}, err + } + + forwardRoute := routing.Route{ + Desc: route.Desc, + Path: route.Forward, + KeepAlive: route.KeepAlive, + } + reverseRoute := routing.Route{ + Desc: route.Desc.Invert(), + Path: route.Reverse, + KeepAlive: route.KeepAlive, + } + + // Determine the rules to send to visors using loop descriptor and reserved route IDs. + forwardRules, consumeRules, intermediaryRules, err := idr.GenerateRules(forwardRoute, reverseRoute) + if err != nil { + return routing.EdgeRules{}, err + } + + sn.logger.Infof("generated forward rules: %v", forwardRules) + sn.logger.Infof("generated consume rules: %v", consumeRules) + sn.logger.Infof("generated intermediary rules: %v", intermediaryRules) + + errCh := make(chan error, len(intermediaryRules)) + var wg sync.WaitGroup + for pk, rules := range intermediaryRules { + wg.Add(1) + pk, rules := pk, rules + go func() { + defer wg.Done() + if _, err := routerclient.AddIntermediaryRules(ctx, sn.logger, sn.dmsgC, pk, rules); err != nil { + sn.logger.WithField("remote", pk).WithError(err).Warn("failed to add rules") + errCh <- err + } + }() + } + + wg.Wait() + close(errCh) + + if err := finalError(len(intermediaryRules), errCh); err != nil { + return routing.EdgeRules{}, err + } + + initRouteRules := routing.EdgeRules{ + Desc: forwardRoute.Desc, + Forward: forwardRules[route.Desc.SrcPK()], + Reverse: consumeRules[route.Desc.SrcPK()], + } + respRouteRules := routing.EdgeRules{ + Desc: reverseRoute.Desc, + Forward: forwardRules[route.Desc.DstPK()], + Reverse: consumeRules[route.Desc.DstPK()], + } + + // Confirm routes with responding visor. + ok, err := routerclient.AddEdgeRules(ctx, sn.logger, sn.dmsgC, route.Desc.DstPK(), respRouteRules) + if err != nil || !ok { + return routing.EdgeRules{}, fmt.Errorf("failed to confirm loop with destination visor: %v", err) + } + + return initRouteRules, nil +} + +func (sn *Node) reserveRouteIDs(ctx context.Context, route routing.BidirectionalRoute) (*idReservoir, error) { + reservoir, total := newIDReservoir(route.Forward, route.Reverse) + sn.logger.Infof("There are %d route IDs to reserve.", total) + + err := reservoir.ReserveIDs(ctx, sn.logger, sn.dmsgC, routerclient.ReserveIDs) + if err != nil { + sn.logger.WithError(err).Warnf("Failed to reserve route IDs.") + return nil, err + } + + sn.logger.Infof("Successfully reserved route IDs.") + return reservoir, err +} diff --git a/pkg/setup/rpc_gateway.go b/pkg/setup/rpc_gateway.go index 5a276a815a..89dfad187d 100644 --- a/pkg/setup/rpc_gateway.go +++ b/pkg/setup/rpc_gateway.go @@ -3,13 +3,11 @@ package setup import ( "context" "fmt" - "sync" "time" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" - "github.com/skycoin/skywire/pkg/router/routerclient" "github.com/skycoin/skywire/pkg/routing" ) @@ -21,101 +19,30 @@ type RPCGateway struct { func NewRPCGateway(reqPK cipher.PubKey, sn *Node) *RPCGateway { return &RPCGateway{ - logger: logging.MustGetLogger("setup-gateway"), + logger: logging.MustGetLogger(fmt.Sprintf("setup-gateway (%s)", reqPK)), reqPK: reqPK, sn: sn, } } -func (g *RPCGateway) DialRouteGroup(route routing.BidirectionalRoute, rules *routing.EdgeRules) (failure error) { +func (g *RPCGateway) DialRouteGroup(route routing.BidirectionalRoute, rules *routing.EdgeRules) (err error) { startTime := time.Now() defer func() { - g.sn.metrics.Record(time.Since(startTime), failure != nil) + g.sn.metrics.Record(time.Since(startTime), err != nil) }() g.logger.Infof("Received RPC DialRouteGroup request") - ctx := context.Background() - idr, err := g.reserveRouteIDs(ctx, route) - if err != nil { - return err - } - - forwardRoute := routing.Route{ - Desc: route.Desc, - Path: route.Forward, - KeepAlive: route.KeepAlive, - } - - reverseRoute := routing.Route{ - Desc: route.Desc.Invert(), - Path: route.Reverse, - KeepAlive: route.KeepAlive, - } + // TODO: Is there a better way to do timeout? + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() - // Determine the rules to send to visors using loop descriptor and reserved route IDs. - forwardRules, consumeRules, intermediaryRules, err := idr.GenerateRules(forwardRoute, reverseRoute) + initRules, err := g.sn.handleDialRouteGroup(ctx, route) if err != nil { return err } - g.logger.Infof("generated forward rules: %v", forwardRules) - g.logger.Infof("generated consume rules: %v", consumeRules) - g.logger.Infof("generated intermediary rules: %v", intermediaryRules) - - errCh := make(chan error, len(intermediaryRules)) - var wg sync.WaitGroup - - for pk, rules := range intermediaryRules { - wg.Add(1) - pk, rules := pk, rules - go func() { - defer wg.Done() - if _, err := routerclient.AddIntermediaryRules(ctx, g.logger, g.sn.dmsgC, pk, rules); err != nil { - g.logger.WithField("remote", pk).WithError(err).Warn("failed to add rules") - errCh <- err - } - }() - } - - wg.Wait() - close(errCh) - - if err := finalError(len(intermediaryRules), errCh); err != nil { - return err - } - - initRouteRules := routing.EdgeRules{ - Desc: forwardRoute.Desc, - Forward: forwardRules[route.Desc.SrcPK()], - Reverse: consumeRules[route.Desc.SrcPK()], - } - - respRouteRules := routing.EdgeRules{ - Desc: reverseRoute.Desc, - Forward: forwardRules[route.Desc.DstPK()], - Reverse: consumeRules[route.Desc.DstPK()], - } - - // Confirm routes with responding visor. - ok, err := routerclient.AddEdgeRules(ctx, g.logger, g.sn.dmsgC, route.Desc.DstPK(), respRouteRules) - if err != nil || !ok { - return fmt.Errorf("failed to confirm loop with destination visor: %v", err) - } // Confirm routes with initiating visor. - *rules = initRouteRules + *rules = initRules return nil } - -func (g *RPCGateway) reserveRouteIDs(ctx context.Context, route routing.BidirectionalRoute) (*idReservoir, error) { - reservoir, total := newIDReservoir(route.Forward, route.Reverse) - g.logger.Infof("There are %d route IDs to reserve.", total) - - err := reservoir.ReserveIDs(ctx, g.logger, g.sn.dmsgC, routerclient.ReserveIDs) - if err != nil { - g.logger.WithError(err).Warnf("Failed to reserve route IDs.") - return nil, err - } - g.logger.Infof("Successfully reserved route IDs.") - return reservoir, err -} diff --git a/pkg/setup/setupclient/client.go b/pkg/setup/setupclient/client.go index d2476fc05d..0da97c7c17 100644 --- a/pkg/setup/setupclient/client.go +++ b/pkg/setup/setupclient/client.go @@ -15,18 +15,18 @@ import ( const rpcName = "Gateway" type Client struct { - log *logging.Logger - n *snet.Network - nodes []cipher.PubKey - conn *snet.Conn - rpc *rpc.Client + log *logging.Logger + n *snet.Network + setupNodes []cipher.PubKey + conn *snet.Conn + rpc *rpc.Client } -func NewClient(ctx context.Context, log *logging.Logger, n *snet.Network, nodes []cipher.PubKey) (*Client, error) { +func NewClient(ctx context.Context, log *logging.Logger, n *snet.Network, setupNodes []cipher.PubKey) (*Client, error) { client := &Client{ - log: log, - n: n, - nodes: nodes, + log: log, + n: n, + setupNodes: setupNodes, } conn, err := client.dial(ctx) @@ -41,7 +41,7 @@ func NewClient(ctx context.Context, log *logging.Logger, n *snet.Network, nodes } func (c *Client) dial(ctx context.Context) (*snet.Conn, error) { - for _, sPK := range c.nodes { + for _, sPK := range c.setupNodes { conn, err := c.n.Dial(ctx, snet.DmsgType, sPK, snet.SetupPort) if err != nil { c.log.WithError(err).Warnf("failed to dial to setup node: setupPK(%s)", sPK) diff --git a/pkg/setup/setupclient/wrappers.go b/pkg/setup/setupclient/wrappers.go index ea99461c94..8d40580059 100644 --- a/pkg/setup/setupclient/wrappers.go +++ b/pkg/setup/setupclient/wrappers.go @@ -11,10 +11,10 @@ import ( "github.com/skycoin/skywire/pkg/snet" ) -func DialRouteGroup(ctx context.Context, log *logging.Logger, n *snet.Network, nodes []cipher.PubKey, +func DialRouteGroup(ctx context.Context, log *logging.Logger, n *snet.Network, setupNodes []cipher.PubKey, req routing.BidirectionalRoute) (routing.EdgeRules, error) { - client, err := NewClient(ctx, log, n, nodes) + client, err := NewClient(ctx, log, n, setupNodes) if err != nil { return routing.EdgeRules{}, err } diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 7badd0cf2b..b664542202 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -249,24 +249,24 @@ func NewMockRPCClient(r *rand.Rand, maxTps int, maxRules int) (cipher.PubKey, RP } lp := routing.Port(binary.BigEndian.Uint16(lpRaw[:])) rp := routing.Port(binary.BigEndian.Uint16(rpRaw[:])) - fwdRID, err := rt.ReserveKey() + fwdRID, err := rt.ReserveKeys(1) if err != nil { panic(err) } - fwdRule := routing.IntermediaryForwardRule(ruleKeepAlive, fwdRID, routing.RouteID(r.Uint32()), uuid.New()) + fwdRule := routing.IntermediaryForwardRule(ruleKeepAlive, fwdRID[0], routing.RouteID(r.Uint32()), uuid.New()) if err := rt.SaveRule(fwdRule); err != nil { panic(err) } - appRID, err := rt.ReserveKey() + appRID, err := rt.ReserveKeys(1) if err != nil { panic(err) } - consumeRule := routing.ConsumeRule(ruleKeepAlive, appRID, remotePK, lp, rp) + consumeRule := routing.ConsumeRule(ruleKeepAlive, appRID[0], remotePK, lp, rp) if err := rt.SaveRule(consumeRule); err != nil { panic(err) } log.Infof("rt[%2da]: %v %v", i, fwdRID, fwdRule.Summary().ForwardFields) - log.Infof("rt[%2db]: %v %v", i, appRID, consumeRule.Summary().ConsumeFields) + log.Infof("rt[%2db]: %v %v", i, appRID[0], consumeRule.Summary().ConsumeFields) } log.Printf("rtCount: %d", rt.Count()) client := &mockRPCClient{ diff --git a/pkg/visor/visor_test.go b/pkg/visor/visor_test.go index fea4a3e624..a8c4aa71ec 100644 --- a/pkg/visor/visor_test.go +++ b/pkg/visor/visor_test.go @@ -264,7 +264,7 @@ func (r *mockRouter) DialRoutes(ctx context.Context, rPK cipher.PubKey, lPort, r panic("implement me") } -func (r *mockRouter) AcceptRoutes() (*router.RouteGroup, error) { +func (r *mockRouter) AcceptRoutes(ctx context.Context) (*router.RouteGroup, error) { panic("implement me") }