Skip to content

Commit

Permalink
Minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Oct 8, 2019
1 parent 28c5ed2 commit 59aefb5
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 155 deletions.
69 changes: 47 additions & 22 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ type DialOptions struct {
MaxConsumeRts int
}

var DefaultDialOptions = &DialOptions{
MinForwardRts: 1,
MaxForwardRts: 1,
MinConsumeRts: 1,
MaxConsumeRts: 1,
func DefaultDialOptions() DialOptions {
return DialOptions{
MinForwardRts: 1,
MaxForwardRts: 1,
MinConsumeRts: 1,
MaxConsumeRts: 1,
}
}

type Router interface {
Expand All @@ -80,7 +82,7 @@ type Router interface {
// Then the following should happen:
// - Save to routing.Table and internal RouteGroup map.
// - Return the RoutingGroup.
AcceptRoutes() (*RouteGroup, error)
AcceptRoutes(context.Context) (*RouteGroup, error)

Serve(context.Context) error

Expand All @@ -92,18 +94,20 @@ type Router interface {
// rules and manages loops for apps.
type router struct {
mx sync.Mutex
once sync.Once
done chan struct{}
wg sync.WaitGroup
conf *Config
logger *logging.Logger
n *snet.Network
sl *snet.Listener
accept chan routing.EdgeRules
trustedNodes map[cipher.PubKey]struct{}
tm *transport.Manager
rt routing.Table
rfc rfclient.Client // route finder client
rgs map[routing.RouteDescriptor]*RouteGroup // route groups to push incoming reads from transports.
rpcSrv *rpc.Server
accept chan routing.EdgeRules
}

// New constructs a new Router.
Expand Down Expand Up @@ -183,8 +187,14 @@ func (r *router) DialRoutes(ctx context.Context, rPK cipher.PubKey, lPort, rPort
// Then the following should happen:
// - Save to routing.Table and internal RouteGroup map.
// - Return the RoutingGroup.
func (r *router) AcceptRoutes() (*RouteGroup, error) {
rules := <-r.accept
func (r *router) AcceptRoutes(ctx context.Context) (*RouteGroup, error) {
var rules routing.EdgeRules
select {
case <-ctx.Done():
return nil, ctx.Err()
case rules = <-r.accept:
break
}

if err := r.saveRoutingRules(rules.Forward, rules.Reverse); err != nil {
return nil, err
Expand Down Expand Up @@ -332,8 +342,17 @@ func (r *router) Close() error {
if r == nil {
return nil
}

r.logger.Info("Closing all App connections and Loops")

r.once.Do(func() {
close(r.done)

r.mx.Lock()
close(r.accept)
r.mx.Unlock()
})

if err := r.sl.Close(); err != nil {
r.logger.WithError(err).Warnf("closing route_manager returned error")
}
Expand Down Expand Up @@ -374,7 +393,8 @@ func (r *router) RemoveRouteDescriptor(desc routing.RouteDescriptor) {
func (r *router) fetchBestRoutes(source, destination cipher.PubKey, opts *DialOptions) (fwd routing.Path, rev routing.Path, err error) {
// TODO(nkryuchkov): use opts
if opts == nil {
opts = DefaultDialOptions
defaultOpts := DefaultDialOptions()
opts = &defaultOpts
}

r.logger.Infof("Requesting new routes from %s to %s", source, destination)
Expand Down Expand Up @@ -420,22 +440,27 @@ func (r *router) saveRoutingRules(rules ...routing.Rule) error {
return nil
}

func (r *router) occupyRouteID(n uint8) ([]routing.RouteID, error) {
var ids = make([]routing.RouteID, n)
for i := range ids {
routeID, err := r.rt.ReserveKey()
if err != nil {
return nil, err
}
ids[i] = routeID
}
return ids, nil
}

func (r *router) routeGroup(desc routing.RouteDescriptor) (*RouteGroup, bool) {
r.mx.Lock()
defer r.mx.Unlock()

rg, ok := r.rgs[desc]
return rg, ok
}

func (r *router) IntroduceRules(rules routing.EdgeRules) error {
select {
case <-r.done:
return io.ErrClosedPipe
default:
r.mx.Lock()
defer r.mx.Unlock()

select {
case r.accept <- rules:
return nil
case <-r.done:
return io.ErrClosedPipe
}
}
}
30 changes: 15 additions & 15 deletions pkg/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ func TestRouter_Serve(t *testing.T) {
defer clearRules(r0, r1)

// Add a FWD rule for r0.
fwdRtID, err := r0.rt.ReserveKey()
fwdRtID, err := r0.rt.ReserveKeys(1)
require.NoError(t, err)

fwdRule := routing.IntermediaryForwardRule(1*time.Hour, fwdRtID, routing.RouteID(5), tp1.Entry.ID)
fwdRule := routing.IntermediaryForwardRule(1*time.Hour, fwdRtID[0], routing.RouteID(5), tp1.Entry.ID)
err = r0.rt.SaveRule(fwdRule)
require.NoError(t, err)

// Call handleTransportPacket for r0 (this should in turn, use the rule we added).
packet := routing.MakeDataPacket(fwdRtID, []byte("This is a test!"))
packet := routing.MakeDataPacket(fwdRtID[0], []byte("This is a test!"))
require.NoError(t, r0.handleTransportPacket(context.TODO(), packet))

// r1 should receive the packet handled by r0.
Expand Down Expand Up @@ -199,33 +199,33 @@ func TestRouter_Rules(t *testing.T) {
t.Run("GetRule", func(t *testing.T) {
clearRules()

expiredID, err := r.rt.ReserveKey()
expiredID, err := r.rt.ReserveKeys(1)
require.NoError(t, err)

expiredRule := routing.IntermediaryForwardRule(-10*time.Minute, expiredID, 3, uuid.New())
expiredRule := routing.IntermediaryForwardRule(-10*time.Minute, expiredID[0], 3, uuid.New())
err = r.rt.SaveRule(expiredRule)
require.NoError(t, err)

id, err := r.rt.ReserveKey()
id, err := r.rt.ReserveKeys(1)
require.NoError(t, err)

rule := routing.IntermediaryForwardRule(10*time.Minute, id, 3, uuid.New())
rule := routing.IntermediaryForwardRule(10*time.Minute, id[0], 3, uuid.New())
err = r.rt.SaveRule(rule)
require.NoError(t, err)

defer r.rt.DelRules([]routing.RouteID{id, expiredID})
defer r.rt.DelRules([]routing.RouteID{id[0], expiredID[0]})

// rule should already be expired at this point due to the execution time.
// However, we'll just a bit to be sure
time.Sleep(1 * time.Millisecond)

_, err = r.GetRule(expiredID)
_, err = r.GetRule(expiredID[0])
require.Error(t, err)

_, err = r.GetRule(123)
require.Error(t, err)

r, err := r.GetRule(id)
r, err := r.GetRule(id[0])
require.NoError(t, err)
assert.Equal(t, rule, r)
})
Expand All @@ -236,10 +236,10 @@ func TestRouter_Rules(t *testing.T) {

pk, _ := cipher.GenerateKeyPair()

id, err := r.rt.ReserveKey()
id, err := r.rt.ReserveKeys(1)
require.NoError(t, err)

rule := routing.ConsumeRule(10*time.Minute, id, pk, 2, 3)
rule := routing.ConsumeRule(10*time.Minute, id[0], pk, 2, 3)
err = r.rt.SaveRule(rule)
require.NoError(t, err)

Expand Down Expand Up @@ -323,17 +323,17 @@ func TestRouter_Rules(t *testing.T) {

proto := setup.NewSetupProtocol(in)

id, err := r.rt.ReserveKey()
id, err := r.rt.ReserveKeys(1)
require.NoError(t, err)

rule := routing.IntermediaryForwardRule(10*time.Minute, id, 3, uuid.New())
rule := routing.IntermediaryForwardRule(10*time.Minute, id[0], 3, uuid.New())

err = r.rt.SaveRule(rule)
require.NoError(t, err)

assert.Equal(t, 1, rt.Count())

require.NoError(t, setup.DeleteRule(context.TODO(), proto, id))
require.NoError(t, setup.DeleteRule(context.TODO(), proto, id[0]))
assert.Equal(t, 0, rt.Count())
})

Expand Down
1 change: 1 addition & 0 deletions pkg/router/routerclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (c *Client) Close() error {
return nil
}

// TODO: make sure that deadline functions are used, then get rid of context here and below
func (c *Client) AddEdgeRules(ctx context.Context, rules routing.EdgeRules) (bool, error) {
var ok bool
err := c.call(ctx, rpcName+".AddEdgeRules", rules, &ok)
Expand Down
8 changes: 4 additions & 4 deletions pkg/router/rpc_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func NewRPCGateway(router *router) *RPCGateway {
}

func (r *RPCGateway) AddEdgeRules(rules routing.EdgeRules, ok *bool) error {
go func() {
r.router.accept <- rules
}()
if err := r.router.IntroduceRules(rules); err != nil {
return err
}

if err := r.router.saveRoutingRules(rules.Forward, rules.Reverse); err != nil {
*ok = false
Expand All @@ -46,7 +46,7 @@ func (r *RPCGateway) AddIntermediaryRules(rules []routing.Rule, ok *bool) error
}

func (r *RPCGateway) ReserveIDs(n uint8, routeIDs *[]routing.RouteID) error {
ids, err := r.router.occupyRouteID(n)
ids, err := r.router.rt.ReserveKeys(int(n))
if err != nil {
r.logger.WithError(err).Warnf("Request completed with error.")
return setup.Failure{Code: setup.FailureReserveRtIDs, Msg: err.Error()}
Expand Down
31 changes: 24 additions & 7 deletions pkg/routing/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ var (

// Table represents a routing table implementation.
type Table interface {
// ReserveKey reserves a RouteID.
ReserveKey() (RouteID, error)
// ReserveKeys reserves n RouteIDs.
ReserveKeys(n int) ([]RouteID, error)

// SaveRule sets RoutingRule for a given RouteID.
SaveRule(Rule) error
Expand Down Expand Up @@ -80,16 +80,33 @@ func NewTable(config Config) Table {
return mt
}

func (mt *memTable) ReserveKey() (key RouteID, err error) {
func (mt *memTable) ReserveKeys(n int) ([]RouteID, error) {
first, last, err := mt.reserveKeysImpl(n)
if err != nil {
return nil, err
}

routes := make([]RouteID, 0, n)
for id := first; id <= last; id++ {
routes = append(routes, id)
}

return routes, nil
}

func (mt *memTable) reserveKeysImpl(n int) (first, last RouteID, err error) {
mt.Lock()
defer mt.Unlock()

if mt.nextID == math.MaxUint32 {
return 0, ErrNoAvailableRoutes
if int64(mt.nextID)+int64(n) >= math.MaxUint32 {
return 0, 0, ErrNoAvailableRoutes
}

mt.nextID++
return mt.nextID, nil
first = mt.nextID + 1
mt.nextID += RouteID(n)
last = mt.nextID

return first, last, nil
}

func (mt *memTable) SaveRule(rule Rule) error {
Expand Down
16 changes: 8 additions & 8 deletions pkg/routing/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,40 @@ func TestMain(m *testing.M) {
func RoutingTableSuite(t *testing.T, tbl Table) {
t.Helper()

id, err := tbl.ReserveKey()
id, err := tbl.ReserveKeys(1)
require.NoError(t, err)

rule := IntermediaryForwardRule(15*time.Minute, id, 2, uuid.New())
rule := IntermediaryForwardRule(15*time.Minute, id[0], 2, uuid.New())
err = tbl.SaveRule(rule)
require.NoError(t, err)

assert.Equal(t, 1, tbl.Count())

r, err := tbl.Rule(id)
r, err := tbl.Rule(id[0])
require.NoError(t, err)
assert.Equal(t, rule, r)

id2, err := tbl.ReserveKey()
id2, err := tbl.ReserveKeys(1)
require.NoError(t, err)

rule2 := IntermediaryForwardRule(15*time.Minute, id2, 3, uuid.New())
rule2 := IntermediaryForwardRule(15*time.Minute, id2[0], 3, uuid.New())
err = tbl.SaveRule(rule2)
require.NoError(t, err)

assert.Equal(t, 2, tbl.Count())
require.NoError(t, tbl.SaveRule(rule))

r, err = tbl.Rule(id)
r, err = tbl.Rule(id[0])
require.NoError(t, err)
assert.Equal(t, rule, r)

ids := make([]RouteID, 0)
for _, rule := range tbl.AllRules() {
ids = append(ids, rule.KeyRouteID())
}
require.ElementsMatch(t, []RouteID{id, id2}, ids)
require.ElementsMatch(t, []RouteID{id[0], id2[0]}, ids)

tbl.DelRules([]RouteID{id, id2})
tbl.DelRules([]RouteID{id[0], id2[0]})
assert.Equal(t, 0, tbl.Count())
}

Expand Down
Loading

0 comments on commit 59aefb5

Please sign in to comment.