Skip to content

Commit

Permalink
Merge branch 'mainnet-milestone2' into feature/router2
Browse files Browse the repository at this point in the history
# Conflicts:
#	pkg/router/route_manager.go
#	pkg/router/route_manager_test.go
#	pkg/routing/rule.go
#	pkg/setup/node.go
  • Loading branch information
nkryuchkov committed Sep 10, 2019
2 parents 01276f7 + 6439c70 commit 3389d73
Show file tree
Hide file tree
Showing 10 changed files with 680 additions and 651 deletions.
20 changes: 14 additions & 6 deletions pkg/router/route_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (rm *routeManager) handleSetupConn(conn net.Conn) error {
case setup.PacketLoopClosed:
err = rm.loopClosed(body)
case setup.PacketRequestRouteID:
respBody, err = rm.occupyRouteID()
respBody, err = rm.occupyRouteID(body)
default:
err = errors.New("unknown foundation packet")
}
Expand Down Expand Up @@ -314,12 +314,20 @@ func (rm *routeManager) loopClosed(data []byte) error {
return rm.conf.OnLoopClosed(ld.Loop)
}

func (rm *routeManager) occupyRouteID() ([]routing.RouteID, error) {
rule := routing.IntermediaryForwardRule(DefaultRouteKeepAlive, 0, 0, uuid.UUID{})
routeID, err := rm.rt.AddRule(rule)
if err != nil {
func (rm *routeManager) occupyRouteID(data []byte) ([]routing.RouteID, error) {
var n uint8
if err := json.Unmarshal(data, &n); err != nil {
return nil, err
}

return []routing.RouteID{routeID}, nil
var ids = make([]routing.RouteID, n)
for i := range ids {
rule := routing.IntermediaryForwardRule(DefaultRouteKeepAlive, 0, 0, uuid.UUID{})
routeID, err := rm.rt.AddRule(rule)
if err != nil {
return nil, err
}
ids[i] = routeID
}
return ids, nil
}
12 changes: 6 additions & 6 deletions pkg/router/route_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,26 +113,26 @@ func TestNewRouteManager(t *testing.T) {
}()

// Emulate SetupNode sending RequestRegistrationID request.
id, err := setup.RequestRouteID(context.TODO(), setup.NewSetupProtocol(requestIDIn))
ids, err := setup.RequestRouteIDs(context.TODO(), setup.NewSetupProtocol(requestIDIn), 1)
require.NoError(t, err)

// Emulate SetupNode sending AddRule request.
rule := routing.IntermediaryForwardRule(10*time.Minute, id, 3, uuid.New())
err = setup.AddRule(context.TODO(), setup.NewSetupProtocol(addIn), rule)
rule := routing.IntermediaryForwardRule(10*time.Minute, ids[0], 3, uuid.New())
err = setup.AddRules(context.TODO(), setup.NewSetupProtocol(addIn), []routing.Rule{rule})
require.NoError(t, err)

// Check routing table state after AddRule.
assert.Equal(t, 1, rt.Count())
r, err := rt.Rule(id)
r, err := rt.Rule(ids[0])
require.NoError(t, err)
assert.Equal(t, rule, r)

// Emulate SetupNode sending RemoveRule request.
require.NoError(t, setup.DeleteRule(context.TODO(), setup.NewSetupProtocol(delIn), id))
require.NoError(t, setup.DeleteRule(context.TODO(), setup.NewSetupProtocol(delIn), ids[0]))

// Check routing table state after DeleteRule.
assert.Equal(t, 0, rt.Count())
r, err = rt.Rule(id)
r, err = rt.Rule(ids[0])
assert.Error(t, err)
assert.Nil(t, r)
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/routing/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,16 @@ func (d RouteDescriptor) DstPort() Port {
func (r Rule) String() string {
switch t := r.Type(); t {
case RuleConsume:
return fmt.Sprintf("App: <remote-pk: %s><remote-port: %d><local-port: %d>",
r.RouteDescriptor().DstPK(), r.RouteDescriptor().DstPort(), r.RouteDescriptor().SrcPK())
rd := r.RouteDescriptor()
return fmt.Sprintf("APP(keyRtID:%d, resRtID:%d, rPK:%s, rPort:%d, lPort:%d)",
r.KeyRouteID(), r.NextRouteID(), rd.DstPK(), rd.DstPort(), rd.SrcPK())
case RuleForward:
return fmt.Sprintf("Forward: <next-rid: %d><next-tid: %s><remote-pk: %s><remote-port: %d><local-port: %d>",
r.NextRouteID(), r.NextTransportID(),
r.RouteDescriptor().DstPK(), r.RouteDescriptor().DstPort(), r.RouteDescriptor().SrcPK())
rd := r.RouteDescriptor()
return fmt.Sprintf("FWD(keyRtID:%d, nxtRtID:%d, nxtTpID:%s, rPK:%s, rPort:%d, lPort:%d)",
r.KeyRouteID(), r.NextRouteID(), r.NextTransportID(), rd.DstPK(), rd.DstPort(), rd.SrcPK())
case RuleIntermediaryForward:
return fmt.Sprintf("IntermediaryForward: <next-rid: %d><next-tid: %s>",
r.NextRouteID(), r.NextTransportID())
return fmt.Sprintf("IFWD(keyRtID:%d, nxtRtID:%d, nxtTpID:%s)",
r.KeyRouteID(), r.NextRouteID(), r.NextTransportID())
default:
panic(fmt.Sprintf("%v: %v", invalidRule, t.String()))
}
Expand All @@ -294,6 +295,10 @@ type RouteDescriptorFields struct {
SrcPort Port `json:"src_port"`
}

//func (r Rule) MarshalJSON() ([]byte, error) {
// return json.Marshal(r.String())
//}

// RuleConsumeFields summarizes consume fields of a RoutingRule.
type RuleConsumeFields struct {
RouteDescriptor RouteDescriptorFields `json:"route_descriptor"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

// Various timeouts for setup node.
const (
ServeTransportTimeout = time.Second * 30
ReadTimeout = time.Second * 10
RequestTimeout = time.Second * 30
ReadTimeout = time.Second * 10
)

// Config defines configuration parameters for setup Node.
Expand Down
164 changes: 164 additions & 0 deletions pkg/setup/idreservoir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package setup

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/skycoin/dmsg/cipher"

"github.com/skycoin/skywire/pkg/routing"
)

type idReservoir struct {
rec map[cipher.PubKey]uint8
ids map[cipher.PubKey][]routing.RouteID
mx sync.Mutex
}

func newIDReservoir(routes ...routing.Route) (*idReservoir, int) {
rec := make(map[cipher.PubKey]uint8)
var total int

for _, rt := range routes {
if len(rt) == 0 {
continue
}
rec[rt[0].From]++
for _, hop := range rt {
rec[hop.To]++
}
total += len(rt) + 1
}

return &idReservoir{
rec: rec,
ids: make(map[cipher.PubKey][]routing.RouteID),
}, total
}

func (idr *idReservoir) ReserveIDs(ctx context.Context, reserve func(ctx context.Context, pk cipher.PubKey, n uint8) ([]routing.RouteID, error)) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

errCh := make(chan error, len(idr.rec))
defer close(errCh)

for pk, n := range idr.rec {
pk, n := pk, n
go func() {
ids, err := reserve(ctx, pk, n)
if err != nil {
errCh <- fmt.Errorf("reserve routeID from %s failed: %v", pk, err)
return
}
idr.mx.Lock()
idr.ids[pk] = ids
idr.mx.Unlock()
errCh <- nil
}()
}

return finalError(len(idr.rec), errCh)
}

func (idr *idReservoir) PopID(pk cipher.PubKey) (routing.RouteID, bool) {
idr.mx.Lock()
defer idr.mx.Unlock()

ids, ok := idr.ids[pk]
if !ok || len(ids) == 0 {
return 0, false
}

idr.ids[pk] = ids[1:]
return ids[0], true
}

// RulesMap associates a slice of rules to a visor's public key.
type RulesMap map[cipher.PubKey][]routing.Rule

func (rm RulesMap) String() string {
out := make(map[cipher.PubKey][]string, len(rm))
for pk, rules := range rm {
str := make([]string, len(rules))
for i, rule := range rules {
str[i] = rule.String()
}
out[pk] = str
}
jb, err := json.MarshalIndent(out, "", "\t")
if err != nil {
panic(err)
}
return string(jb)
}

// GenerateRules generates rules for a given LoopDescriptor.
// The outputs are as follows:
// - rules: a map that relates a slice of routing rules to a given visor's public key.
// - srcAppRID: the initiating node's route ID that references the FWD rule.
// - dstAppRID: the responding node's route ID that references the FWD rule.
// - err: an error (if any).
func GenerateRules(idc *idReservoir, ld routing.LoopDescriptor) (rules RulesMap, srcFwdRID, dstFwdRID routing.RouteID, err error) {
rules = make(RulesMap)
src, dst := ld.Loop.Local, ld.Loop.Remote

firstFwdRID, _, err := SaveForwardRules(rules, idc, ld.KeepAlive, ld.Forward)
if err != nil {
return nil, 0, 0, err
}
firstRevRID, _, err := SaveForwardRules(rules, idc, ld.KeepAlive, ld.Reverse)
if err != nil {
return nil, 0, 0, err
}

rules[src.PubKey] = append(rules[src.PubKey],
routing.ConsumeRule(ld.KeepAlive, firstRevRID, dst.PubKey, src.Port, dst.Port))
rules[dst.PubKey] = append(rules[dst.PubKey],
routing.ConsumeRule(ld.KeepAlive, firstFwdRID, src.PubKey, dst.Port, src.Port))

return rules, firstFwdRID, firstRevRID, nil
}

// SaveForwardRules creates the rules of the given route, and saves them in the 'rules' input.
// Note that the last rule for the route is always an APP rule, and so is not created here.
// The outputs are as follows:
// - firstRID: the first visor's route ID.
// - lastRID: the last visor's route ID (note that there is no rule set for this ID yet).
// - err: an error (if any).
func SaveForwardRules(rules RulesMap, idc *idReservoir, keepAlive time.Duration, route routing.Route) (firstRID, lastRID routing.RouteID, err error) {

// 'firstRID' is the first visor's key routeID - this is to be returned.
var ok bool
if firstRID, ok = idc.PopID(route[0].From); !ok {
return 0, 0, errors.New("fucked up")
}

var rID = firstRID
for _, hop := range route {
nxtRID, ok := idc.PopID(hop.To)
if !ok {
return 0, 0, errors.New("fucked up")
}
rule := routing.IntermediaryForwardRule(keepAlive, rID, nxtRID, hop.Transport)
rules[hop.From] = append(rules[hop.From], rule)

rID = nxtRID
}

return firstRID, rID, nil
}

func finalError(n int, errCh <-chan error) error {
var finalErr error
for i := 0; i < n; i++ {
if err := <-errCh; err != nil {
finalErr = err
}
}
return finalErr
}
Loading

0 comments on commit 3389d73

Please sign in to comment.