Skip to content

Commit

Permalink
Changed behaviour of setup.
Browse files Browse the repository at this point in the history
* Reserving route IDs and adding rules to visors is now split into two communication steps.
* Improved readability and testability of the setup procedure but splitting responsibilities to additional structures; setup.idReservoir, setup.RulesMap
* Improved logging for setup procedure.
* Slightly tweaked setup.Protocol to accommodate aforementioned changes.
  • Loading branch information
Evan Lin committed Sep 8, 2019
1 parent 5512d1b commit add2e38
Show file tree
Hide file tree
Showing 8 changed files with 637 additions and 603 deletions.
20 changes: 14 additions & 6 deletions pkg/router/route_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (rm *routeManager) handleSetupConn(conn net.Conn) error {
case setup.PacketLoopClosed:
err = rm.loopClosed(body)
case setup.PacketRequestRouteID:
respBody, err = rm.occupyRouteID()
respBody, err = rm.occupyRouteID(body)
default:
err = errors.New("unknown foundation packet")
}
Expand Down Expand Up @@ -312,12 +312,20 @@ func (rm *routeManager) loopClosed(data []byte) error {
return rm.conf.OnLoopClosed(ld.Loop)
}

func (rm *routeManager) occupyRouteID() ([]routing.RouteID, error) {
rule := routing.ForwardRule(DefaultRouteKeepAlive, 0, uuid.UUID{}, 0)
routeID, err := rm.rt.AddRule(rule)
if err != nil {
func (rm *routeManager) occupyRouteID(data []byte) ([]routing.RouteID, error) {
var n uint8
if err := json.Unmarshal(data, &n); err != nil {
return nil, err
}

return []routing.RouteID{routeID}, nil
var ids = make([]routing.RouteID, n)
for i := range ids {
rule := routing.ForwardRule(DefaultRouteKeepAlive, 0, uuid.UUID{}, 0)
routeID, err := rm.rt.AddRule(rule)
if err != nil {
return nil, err
}
ids[i] = routeID
}
return ids, nil
}
12 changes: 6 additions & 6 deletions pkg/router/route_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,26 +114,26 @@ func TestNewRouteManager(t *testing.T) {
}()

// Emulate SetupNode sending RequestRegistrationID request.
id, err := setup.RequestRouteID(context.TODO(), setup.NewSetupProtocol(requestIDIn))
ids, err := setup.RequestRouteIDs(context.TODO(), setup.NewSetupProtocol(requestIDIn), 1)
require.NoError(t, err)

// Emulate SetupNode sending AddRule request.
rule := routing.ForwardRule(10*time.Minute, 3, uuid.New(), id)
err = setup.AddRule(context.TODO(), setup.NewSetupProtocol(addIn), rule)
rule := routing.ForwardRule(10*time.Minute, 3, uuid.New(), ids[0])
err = setup.AddRules(context.TODO(), setup.NewSetupProtocol(addIn), []routing.Rule{rule})
require.NoError(t, err)

// Check routing table state after AddRule.
assert.Equal(t, 1, rt.Count())
r, err := rt.Rule(id)
r, err := rt.Rule(ids[0])
require.NoError(t, err)
assert.Equal(t, rule, r)

// Emulate SetupNode sending RemoveRule request.
require.NoError(t, setup.DeleteRule(context.TODO(), setup.NewSetupProtocol(delIn), id))
require.NoError(t, setup.DeleteRule(context.TODO(), setup.NewSetupProtocol(delIn), ids[0]))

// Check routing table state after DeleteRule.
assert.Equal(t, 0, rt.Count())
r, err = rt.Rule(id)
r, err = rt.Rule(ids[0])
assert.Error(t, err)
assert.Nil(t, r)
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/routing/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,22 @@ func (r Rule) SetRequestRouteID(id RouteID) {
}

func (r Rule) String() string {
if r.Type() == RuleApp {
return fmt.Sprintf("App: <resp-rid: %d><remote-pk: %s><remote-port: %d><local-port: %d>",
r.RouteID(), r.RemotePK(), r.RemotePort(), r.LocalPort())
switch r.Type() {
case RuleApp:
return fmt.Sprintf("APP(keyRtID:%d, resRtID:%d, rPK:%s, rPort:%d, lPort:%d)",
r.RequestRouteID(), r.RouteID(), r.RemotePK(), r.RemotePort(), r.LocalPort())
case RuleForward:
return fmt.Sprintf("FWD(keyRtID:%d, nxtRtID:%d, nxtTpID:%s)",
r.RequestRouteID(), r.RouteID(), r.TransportID())
default:
return "invalid rule"
}

return fmt.Sprintf("Forward: <next-rid: %d><next-tid: %s>", r.RouteID(), r.TransportID())
}

//func (r Rule) MarshalJSON() ([]byte, error) {
// return json.Marshal(r.String())
//}

// RuleAppFields summarizes App fields of a RoutingRule.
type RuleAppFields struct {
RespRID RouteID `json:"resp_rid"`
Expand Down
164 changes: 164 additions & 0 deletions pkg/setup/idreservoir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package setup

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/skycoin/dmsg/cipher"

"github.com/skycoin/skywire/pkg/routing"
)

type idReservoir struct {
rec map[cipher.PubKey]uint8
ids map[cipher.PubKey][]routing.RouteID
mx sync.Mutex
}

func newIDReservoir(routes ...routing.Route) (*idReservoir, int) {
rec := make(map[cipher.PubKey]uint8)
var total int

for _, rt := range routes {
if len(rt) == 0 {
continue
}
rec[rt[0].From]++
for _, hop := range rt {
rec[hop.To]++
}
total += len(rt) + 1
}

return &idReservoir{
rec: rec,
ids: make(map[cipher.PubKey][]routing.RouteID),
}, total
}

func (idr *idReservoir) ReserveIDs(ctx context.Context, reserve func(ctx context.Context, pk cipher.PubKey, n uint8) ([]routing.RouteID, error)) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

errCh := make(chan error, len(idr.rec))
defer close(errCh)

for pk, n := range idr.rec {
pk, n := pk, n
go func() {
ids, err := reserve(ctx, pk, n)
if err != nil {
errCh <- fmt.Errorf("reserve routeID from %s failed: %v", pk, err)
return
}
idr.mx.Lock()
idr.ids[pk] = ids
idr.mx.Unlock()
errCh <- nil
}()
}

return finalError(len(idr.rec), errCh)
}

func (idr *idReservoir) PopID(pk cipher.PubKey) (routing.RouteID, bool) {
idr.mx.Lock()
defer idr.mx.Unlock()

ids, ok := idr.ids[pk]
if !ok || len(ids) == 0 {
return 0, false
}

idr.ids[pk] = ids[1:]
return ids[0], true
}

// RulesMap associates a slice of rules to a visor's public key.
type RulesMap map[cipher.PubKey][]routing.Rule

func (rm RulesMap) String() string {
out := make(map[cipher.PubKey][]string, len(rm))
for pk, rules := range rm {
str := make([]string, len(rules))
for i, rule := range rules {
str[i] = rule.String()
}
out[pk] = str
}
jb, err := json.MarshalIndent(out, "", "\t")
if err != nil {
panic(err)
}
return string(jb)
}

// GenerateRules generates rules for a given LoopDescriptor.
// The outputs are as follows:
// - rules: a map that relates a slice of routing rules to a given visor's public key.
// - srcAppRID: the initiating node's route ID that references the FWD rule.
// - dstAppRID: the responding node's route ID that references the FWD rule.
// - err: an error (if any).
func GenerateRules(idc *idReservoir, ld routing.LoopDescriptor) (rules RulesMap, srcFwdRID, dstFwdRID routing.RouteID, err error) {
rules = make(RulesMap)
src, dst := ld.Loop.Local, ld.Loop.Remote

firstFwdRID, lastFwdRID, err := SaveForwardRules(rules, idc, ld.KeepAlive, ld.Forward)
if err != nil {
return nil, 0, 0, err
}
firstRevRID, lastRevRID, err := SaveForwardRules(rules, idc, ld.KeepAlive, ld.Reverse)
if err != nil {
return nil, 0, 0, err
}

rules[src.PubKey] = append(rules[src.PubKey],
routing.AppRule(ld.KeepAlive, firstRevRID, lastFwdRID, dst.PubKey, src.Port, dst.Port))
rules[dst.PubKey] = append(rules[dst.PubKey],
routing.AppRule(ld.KeepAlive, firstFwdRID, lastRevRID, src.PubKey, dst.Port, src.Port))

return rules, firstFwdRID, firstRevRID, nil
}

// SaveForwardRules creates the rules of the given route, and saves them in the 'rules' input.
// Note that the last rule for the route is always an APP rule, and so is not created here.
// The outputs are as follows:
// - firstRID: the first visor's route ID.
// - lastRID: the last visor's route ID (note that there is no rule set for this ID yet).
// - err: an error (if any).
func SaveForwardRules(rules RulesMap, idc *idReservoir, keepAlive time.Duration, route routing.Route) (firstRID, lastRID routing.RouteID, err error) {

// 'firstRID' is the first visor's key routeID - this is to be returned.
var ok bool
if firstRID, ok = idc.PopID(route[0].From); !ok {
return 0, 0, errors.New("fucked up")
}

var rID = firstRID
for _, hop := range route {
nxtRID, ok := idc.PopID(hop.To)
if !ok {
return 0, 0, errors.New("fucked up")
}
rule := routing.ForwardRule(keepAlive, nxtRID, hop.Transport, rID)
rules[hop.From] = append(rules[hop.From], rule)

rID = nxtRID
}

return firstRID, rID, nil
}

func finalError(n int, errCh <-chan error) error {
var finalErr error
for i := 0; i < n; i++ {
if err := <-errCh; err != nil {
finalErr = err
}
}
return finalErr
}
Loading

0 comments on commit add2e38

Please sign in to comment.