Skip to content

Commit

Permalink
Began work to fix bug where visor node restart does not reestablish r…
Browse files Browse the repository at this point in the history
…outes.
  • Loading branch information
Evan Lin committed Sep 8, 2019
1 parent 7d712a3 commit 5512d1b
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 60 deletions.
4 changes: 2 additions & 2 deletions pkg/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

// Various timeouts for setup node.
const (
ServeTransportTimeout = time.Second * 30
ReadTimeout = time.Second * 10
RequestTimeout = time.Second * 30
ReadTimeout = time.Second * 10
)

// Config defines configuration parameters for setup Node.
Expand Down
71 changes: 35 additions & 36 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -78,15 +77,15 @@ func (sn *Node) Serve(ctx context.Context) error {
return err
}
go func(conn *dmsg.Transport) {
if err := sn.serveTransport(ctx, conn); err != nil {
if err := sn.handleRequest(ctx, conn); err != nil {
sn.Logger.Warnf("Failed to serve Transport: %s", err)
}
}(conn)
}
}

func (sn *Node) serveTransport(ctx context.Context, tr *dmsg.Transport) error {
ctx, cancel := context.WithTimeout(ctx, ServeTransportTimeout)
func (sn *Node) handleRequest(ctx context.Context, tr *dmsg.Transport) error {
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
defer cancel()

proto := NewSetupProtocol(tr)
Expand All @@ -95,36 +94,44 @@ func (sn *Node) serveTransport(ctx context.Context, tr *dmsg.Transport) error {
return err
}

sn.Logger.Infof("Got new Setup request with type %s: %s", sp, string(data))
defer sn.Logger.Infof("Completed Setup request with type %s: %s", sp, string(data))
log := sn.Logger.WithField("requester", tr.RemotePK()).WithField("reqType", sp)
log.Infof("Received request.")

startTime := time.Now()

switch sp {
case PacketCreateLoop:
var ld routing.LoopDescriptor
if err = json.Unmarshal(data, &ld); err == nil {
err = sn.createLoop(ctx, ld)
if err = json.Unmarshal(data, &ld); err != nil {
break
}
ldJson, _ := json.MarshalIndent(ld, "", "\t")
log.Infof("CreateLoop loop descriptor: %s", string(ldJson))
err = sn.createLoop(ctx, ld)

case PacketCloseLoop:
var ld routing.LoopData
if err = json.Unmarshal(data, &ld); err == nil {
err = sn.closeLoop(ctx, ld.Loop.Remote.PubKey, routing.LoopData{
Loop: routing.Loop{
Remote: ld.Loop.Local,
Local: ld.Loop.Remote,
},
})
if err = json.Unmarshal(data, &ld); err != nil {
break
}
err = sn.closeLoop(ctx, ld.Loop.Remote.PubKey, routing.LoopData{
Loop: routing.Loop{
Remote: ld.Loop.Local,
Local: ld.Loop.Remote,
},
})

default:
err = errors.New("unknown foundation packet")
}
sn.metrics.Record(time.Since(startTime), err != nil)

if err != nil {
sn.Logger.Infof("Setup request with type %s failed: %s", sp, err)
log.WithError(err).Warnf("Request completed with error.")
return proto.WritePacket(RespFailure, err)
}

log.Infof("Request completed successfully.")
return proto.WritePacket(RespSuccess, nil)
}

Expand Down Expand Up @@ -215,13 +222,12 @@ func (sn *Node) createLoop(ctx context.Context, ld routing.LoopDescriptor) error
//
// During the setup process each error received along the way causes all the procedure to be canceled. RouteID received
// from the 1st step connecting to the initiating node is used as the ID for the overall rule, thus being returned.
func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route routing.Route,
rport, lport routing.Port) (routing.RouteID, error) {
func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route routing.Route, rPort, lPort routing.Port) (routing.RouteID, error) {
if len(route) == 0 {
return 0, nil
}

sn.Logger.Infof("Creating new Route %s", route)
sn.Logger.Infof("Creating a new Route %s", route)

// add the initiating node to the start of the route. We need to loop over all the visor nodes
// along the route to apply rules including the initiating one
Expand Down Expand Up @@ -251,7 +257,7 @@ func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route
resultingRouteIDCh := make(chan routing.RouteID, 2)

// context to cancel rule setup in case of errors
cancellableCtx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(ctx)
for i := len(r) - 1; i >= 0; i-- {
var reqIDChIn, reqIDChOut chan routing.RouteID
// goroutine[0] doesn't need to pass the route ID from the 1st step to anyone
Expand All @@ -268,12 +274,11 @@ func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route
nextTpID = r[i+1].Transport
rule = routing.ForwardRule(keepAlive, 0, nextTpID, 0)
} else {
rule = routing.AppRule(keepAlive, 0, 0, init, lport, rport)
rule = routing.AppRule(keepAlive, 0, 0, init, lPort, rPort)
}

go func(i int, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID,
reqIDChOut chan<- routing.RouteID) {
routeID, err := sn.setupRule(cancellableCtx, pk, rule, reqIDChIn, reqIDChOut)
go func(i int, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID, reqIDChOut chan<- routing.RouteID) {
routeID, err := sn.setupRule(ctx, pk, rule, reqIDChIn, reqIDChOut)
// adding rule for initiator must result with a route ID for the overall route
// it doesn't matter for now if there was an error, this result will be fetched only if there wasn't one
if i == 0 {
Expand All @@ -295,17 +300,16 @@ func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route
}

var rulesSetupErr error
var cancelOnce sync.Once
// check for any errors occurred so far
for range r {
// filter out context cancellation errors
if err := <-rulesSetupErrs; err != nil && err != context.Canceled {
// rules setup failed, cancel further setup
cancelOnce.Do(cancel)
cancel()
rulesSetupErr = err
}
}
cancelOnce.Do(cancel)
cancel()

// close chan to avoid leaks
close(rulesSetupErrs)
Expand Down Expand Up @@ -349,11 +353,7 @@ func (sn *Node) Close() error {
}

func (sn *Node) closeLoop(ctx context.Context, on cipher.PubKey, ld routing.LoopData) error {
fmt.Printf(">>> BEGIN: closeLoop(%s, ld)\n", on)
defer fmt.Printf(">>> END: closeLoop(%s, ld)\n", on)

proto, err := sn.dialAndCreateProto(ctx, on)
fmt.Println(">>> *****: closeLoop() dialed:", err)
if err != nil {
return err
}
Expand All @@ -367,10 +367,9 @@ func (sn *Node) closeLoop(ctx context.Context, on cipher.PubKey, ld routing.Loop
return nil
}

func (sn *Node) setupRule(ctx context.Context, pk cipher.PubKey, rule routing.Rule,
reqIDChIn <-chan routing.RouteID, reqIDChOut chan<- routing.RouteID) (routing.RouteID, error) {
sn.Logger.Debugf("trying to setup setup rule: %v with %s\n", rule, pk)
requestRouteID, err := sn.requestRouteID(ctx, pk)
func (sn *Node) setupRule(ctx context.Context, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID, reqIDChOut chan<- routing.RouteID) (routing.RouteID, error) {
sn.Logger.Debugf("trying to setup setup rule: %v with %s", rule, pk)
requestRouteID, err := sn.requestRouteID(ctx, pk) // take this.
if err != nil {
return 0, err
}
Expand All @@ -386,7 +385,7 @@ func (sn *Node) setupRule(ctx context.Context, pk cipher.PubKey, rule routing.Ru

rule.SetRequestRouteID(requestRouteID)

sn.Logger.Debugf("dialing to %s to setup rule: %v\n", pk, rule)
sn.Logger.Debugf("dialing to %s to setup rule: %v", pk, rule)

if err := sn.addRule(ctx, pk, rule); err != nil {
return 0, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru
mt.connMx.Unlock()
}()

// Read loop.
go func() {
defer func() {
mt.log.Infof("closed readPacket loop.")
Expand All @@ -133,6 +134,7 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru
}
}()

// Redial loop.
for {
select {
case <-mt.done:
Expand Down Expand Up @@ -225,7 +227,6 @@ func (mt *ManagedTransport) Dial(ctx context.Context) error {
return mt.dial(ctx)
}

// TODO: Figure out where this fella is called.
func (mt *ManagedTransport) dial(ctx context.Context) error {
tp, err := mt.n.Dial(mt.netName, mt.rPK, snet.TransportPort)
if err != nil {
Expand Down
21 changes: 0 additions & 21 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,27 +116,6 @@ func (tm *Manager) serve(ctx context.Context) {
}
}

// TODO(nkryuchkov): either use or remove if unused
// func (tm *Manager) initTransports(ctx context.Context) {
// tm.mx.Lock()
// defer tm.mx.Unlock()
//
// entries, err := tm.conf.DiscoveryClient.GetTransportsByEdge(ctx, tm.conf.PubKey)
// if err != nil {
// log.Warnf("No transports found for local node: %v", err)
// }
// for _, entry := range entries {
// var (
// tpType = entry.Entry.Type
// remote = entry.Entry.RemoteEdge(tm.conf.PubKey)
// tpID = entry.Entry.ID
// )
// if _, err := tm.saveTransport(remote, tpType); err != nil {
// tm.Logger.Warnf("INIT: failed to init tp: type(%s) remote(%s) tpID(%s)", tpType, remote, tpID)
// }
// }
// }

func (tm *Manager) acceptTransport(ctx context.Context, lis *snet.Listener) error {
conn, err := lis.AcceptConn() // TODO: tcp panic.
if err != nil {
Expand Down

0 comments on commit 5512d1b

Please sign in to comment.