Skip to content

Commit

Permalink
Make tests fail
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Aug 13, 2019
1 parent de52f62 commit 27c7415
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 22 deletions.
176 changes: 165 additions & 11 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/google/uuid"

"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/dmsg/disc"
"github.com/skycoin/skycoin/src/util/logging"
Expand Down Expand Up @@ -125,11 +127,15 @@ func (sn *Node) createLoop(ld routing.LoopDescriptor) error {
return err
}

fmt.Println("FINISHED SETUP OF REVERSE ROUTE")

fRouteID, err := sn.createRoute(ld.Expiry, ld.Forward, ld.Loop.Remote.Port, ld.Loop.Local.Port)
if err != nil {
return err
}

fmt.Println("FINISHED SETUP OF FORWARD ROUTE")

if len(ld.Forward) == 0 || len(ld.Reverse) == 0 {
return nil
}
Expand All @@ -155,6 +161,8 @@ func (sn *Node) createLoop(ld routing.LoopDescriptor) error {
return fmt.Errorf("loop connect: %s", err)
}

fmt.Println("FINISHED CONNECTLOOP REQUEST")

ldI := routing.LoopData{
Loop: routing.Loop{
Remote: routing.Addr{
Expand Down Expand Up @@ -192,19 +200,32 @@ func (sn *Node) createRoute(expireAt time.Time, route routing.Route, rport, lpor

// indicate errors occurred during rules setup
rulesSetupErrs := make(chan error, len(r))
routeIDsCh := make([]chan routing.RouteID, 0, len(r))
for range r {
routeIDsCh = append(routeIDsCh, make(chan routing.RouteID))
}

// context to cancel rule setup in case of errors
ctx, cancel := context.WithCancel(context.Background())
for idx := len(r) - 1; idx >= 0; idx-- {
/*for idx := len(r) - 1; idx >= 0; idx-- {
hop := &Hop{Hop: route[idx]}
r[idx] = hop
var nextHop *Hop
toPK := hop.To.Hex()
var routeIDChIn, routeIDChOut chan routing.RouteID
if idx > 0 {
routeIDChOut = routeIDsCh[idx-1]
}
if idx != len(r)-1 {
nextHop = r[idx+1]
routeIDChIn = routeIDsCh[idx]
}

go func(idx int, hop, nextHop *Hop) {
var nextTransport uuid.UUID
if idx != len(r)-1 {
nextTransport = route[idx+1].Transport
}
go func(idx int, hop *Hop, routeIDChIn, routeIDChOut chan routing.RouteID, nextTransport uuid.UUID) {
fmt.Printf("Sending RequestRouteID to %v\n", toPK)
routeID, err := sn.requestRouteID(ctx, hop.To)
if err != nil {
// filter out context cancellation errors
Expand All @@ -215,16 +236,27 @@ func (sn *Node) createRoute(expireAt time.Time, route routing.Route, rport, lpor
}
return
}
fmt.Printf("Got RouteID %v from %v\n", routeID, toPK)
hop.routeID = routeID
if routeIDChOut != nil {
routeIDChOut <- routeID
}
var nextRouteID routing.RouteID
if routeIDChIn != nil {
nextRouteID = <-routeIDChIn
}
fmt.Printf("Got nextRouteID %v from chan by goroutine communicating with %v\n", nextRouteID, toPK)
var rule routing.Rule
if nextHop == nil {
rule = routing.AppRule(expireAt, 0, initiator, lport, rport, routeID)
if idx == len(r)-1 {
rule = routing.AppRule(expireAt, 0, initiator, lport, rport, hop.routeID)
} else {
rule = routing.ForwardRule(expireAt, nextHop.routeID, nextHop.Transport, routeID)
rule = routing.ForwardRule(expireAt, nextRouteID, nextTransport, hop.routeID)
}
fmt.Printf("Sending AddRule with RouteID %v to %v\n", routeID, toPK)
err = sn.setupRule(ctx, hop.To, rule)
if err != nil {
// filter out context cancellation errors
Expand All @@ -234,11 +266,104 @@ func (sn *Node) createRoute(expireAt time.Time, route routing.Route, rport, lpor
rulesSetupErrs <- fmt.Errorf("rule setup: %s", err)
}
return
//break
}
fmt.Printf("Got response from AddRule from %v\n", toPK)
// put nil to avoid block
rulesSetupErrs <- nil
}(idx, hop, routeIDChIn, routeIDChOut, nextTransport)
}*/

for idx := len(r) - 1; idx >= 0; idx-- {
hop := &Hop{Hop: route[idx]}
r[idx] = hop

toPK := hop.To.Hex()

var routeIDChIn, routeIDChOut chan routing.RouteID
if idx > 0 {
routeIDChOut = routeIDsCh[idx-1]
}
if idx != len(r)-1 {
routeIDChIn = routeIDsCh[idx]
}
var nextTransport uuid.UUID
if idx != len(r)-1 {
nextTransport = route[idx+1].Transport
}
go func(idx int, hop *Hop, routeIDChIn, routeIDChOut chan routing.RouteID, nextTransport uuid.UUID) {
fmt.Printf("Sending RequestRouteID to %v\n", toPK)
sn.Logger.Debugf("dialing to %s to request route ID\n", toPK)
tr, err := sn.messenger.Dial(ctx, hop.To)
if err != nil {
// filter out context cancellation errors
if err == context.Canceled {
rulesSetupErrs <- err
} else {
rulesSetupErrs <- fmt.Errorf("rule setup: transport: %s", err)
}
return
}
defer func() {
if err := tr.Close(); err != nil {
sn.Logger.Warnf("Failed to close transport: %s", err)
}
}()

proto := NewSetupProtocol(tr)
routeID, err := RequestRouteID(proto)
if err != nil {
// filter out context cancellation errors
if err == context.Canceled {
rulesSetupErrs <- err
} else {
rulesSetupErrs <- fmt.Errorf("rule setup: %s", err)
}
return
}

sn.Logger.Infof("Received route ID %d from %s", routeID, hop.To)

fmt.Printf("Got RouteID %v from %v\n", routeID, toPK)

hop.routeID = routeID

if routeIDChOut != nil {
routeIDChOut <- routeID
}
var nextRouteID routing.RouteID
if routeIDChIn != nil {
nextRouteID = <-routeIDChIn
}
fmt.Printf("Got nextRouteID %v from chan by goroutine communicating with %v\n", nextRouteID, toPK)

var rule routing.Rule
if idx == len(r)-1 {
rule = routing.AppRule(expireAt, 0, initiator, lport, rport, hop.routeID)
} else {
rule = routing.ForwardRule(expireAt, nextRouteID, nextTransport, hop.routeID)
}

fmt.Printf("Sending AddRule with RouteID %v to %v\n", routeID, toPK)
sn.Logger.Debugf("dialing to %s to setup rule: %v\n", hop.To, rule)

if err := AddRule(proto, rule); err != nil {
// filter out context cancellation errors
if err == context.Canceled {
rulesSetupErrs <- err
} else {
rulesSetupErrs <- fmt.Errorf("rule setup: %s", err)
}
return
}

sn.Logger.Infof("Set rule of type %s on %s", rule.Type(), hop.To)
fmt.Printf("Got response from AddRule from %v\n", toPK)

// put nil to avoid block
rulesSetupErrs <- nil
}(idx, hop, nextHop)
}(idx, hop, routeIDChIn, routeIDChOut, nextTransport)
}

var rulesSetupErr error
Expand All @@ -255,20 +380,49 @@ func (sn *Node) createRoute(expireAt time.Time, route routing.Route, rport, lpor

// close chan to avoid leaks
close(rulesSetupErrs)
for _, ch := range routeIDsCh {
close(ch)
}
if rulesSetupErr != nil {
return 0, rulesSetupErr
}

routeID, err := sn.requestRouteID(context.Background(), initiator)
fmt.Println("FINISHED ASYNC SETUP")

/*routeID, err := sn.requestRouteID(context.Background(), initiator)
if err != nil {
return 0, fmt.Errorf("request route id: %s", err)
}*/

sn.Logger.Debugf("dialing to %s to request route ID\n", initiator)
tr, err := sn.messenger.Dial(ctx, initiator)
if err != nil {
return 0, fmt.Errorf("transport: %s", err)
}
defer func() {
if err := tr.Close(); err != nil {
sn.Logger.Warnf("Failed to close transport: %s", err)
}
}()

proto := NewSetupProtocol(tr)
routeID, err := RequestRouteID(proto)
if err != nil {
return 0, err
}

fmt.Println("FINISHED LAST STEP ROUTE ID REQUEST")

rule := routing.ForwardRule(expireAt, r[0].routeID, r[0].Transport, routeID)
if err := sn.setupRule(context.Background(), initiator, rule); err != nil {
/*if err := sn.setupRule(context.Background(), initiator, rule); err != nil {
return 0, fmt.Errorf("rule setup: %s", err)
}*/
if err := AddRule(proto, rule); err != nil {
return 0, fmt.Errorf("rule setup: %s", err)
}

fmt.Println("FINISHED LAST STEP SETUP RULE REQUEST")

return routeID, nil
}

Expand Down
42 changes: 31 additions & 11 deletions pkg/setup/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,37 @@ func TestNode(t *testing.T) {
var nextRouteID uint32
// CLOSURE: emulates how a visor node should react when expecting an AddRules packet.
expectAddRules := func(client int, expRule routing.RuleType) {
clientPK := clients[client].Local().Hex()
fmt.Printf("Client %v has PK %v\n", client, clientPK)
tp, err := clients[client].Accept(context.TODO())
require.NoError(t, err)
defer func() { require.NoError(t, tp.Close()) }()
fmt.Printf("Accepted 1st time by %v\n", clientPK)

proto := NewSetupProtocol(tp)

pt, _, err := proto.ReadPacket()
require.NoError(t, err)
require.Equal(t, PacketRequestRouteID, pt)
fmt.Printf("Received RequestRouteID by %v\n", clientPK)

routeID := atomic.AddUint32(&nextRouteID, 1)

err = proto.WritePacket(RespSuccess, []routing.RouteID{routing.RouteID(routeID)})
require.NoError(t, err)
fmt.Printf("Sent RespSuccess for RequestRouteID with RouteID %v by %v\n", routeID, clientPK)

/*require.NoError(t, tp.Close())
tp, err = clients[client].Accept(context.TODO())
require.NoError(t, err)
fmt.Printf("Called Accept for second time by %v\n", clientPK)
proto = NewSetupProtocol(tp)*/

pt, pp, err := proto.ReadPacket()
require.NoError(t, err)
require.Equal(t, PacketAddRules, pt)
fmt.Printf("Received AddRules by %v\n", clientPK)

var rs []routing.Rule
require.NoError(t, json.Unmarshal(pp, &rs))
Expand All @@ -161,6 +174,9 @@ func TestNode(t *testing.T) {

err = proto.WritePacket(RespSuccess, nil)
require.NoError(t, err)
fmt.Printf("Sent RespSuccess for AddRules by %v\n", clientPK)

require.NoError(t, tp.Close())
}

// CLOSURE: emulates how a visor node should react when expecting an ConfirmLoop packet.
Expand Down Expand Up @@ -192,16 +208,20 @@ func TestNode(t *testing.T) {
_ = proto.WritePacket(RespSuccess, nil) //nolint:errcheck
}

expectAddRules(4, routing.RuleApp)
expectAddRules(3, routing.RuleForward)
expectAddRules(2, routing.RuleForward)
expectAddRules(1, routing.RuleForward)
expectAddRules(1, routing.RuleApp)
expectAddRules(2, routing.RuleForward)
expectAddRules(3, routing.RuleForward)
expectAddRules(4, routing.RuleForward)
expectConfirmLoop(1)
expectConfirmLoop(4)
go expectAddRules(4, routing.RuleApp)
go expectAddRules(3, routing.RuleForward)
go expectAddRules(2, routing.RuleForward)
go expectAddRules(1, routing.RuleForward)
time.Sleep(4000 * time.Millisecond)
go expectAddRules(1, routing.RuleApp)
go expectAddRules(2, routing.RuleForward)
go expectAddRules(3, routing.RuleForward)
go expectAddRules(4, routing.RuleForward)
time.Sleep(4 * time.Second)
go expectConfirmLoop(1)
time.Sleep(4 * time.Second)
go expectConfirmLoop(4)
time.Sleep(4 * time.Second)
})

// TEST: Emulates the communication between 2 visor nodes and a setup nodes,
Expand Down

0 comments on commit 27c7415

Please sign in to comment.