diff --git a/pkg/setup/config.go b/pkg/setup/config.go index ccb4ddad30..e30becc02c 100644 --- a/pkg/setup/config.go +++ b/pkg/setup/config.go @@ -8,8 +8,8 @@ import ( // Various timeouts for setup node. const ( - ServeTransportTimeout = time.Second * 30 - ReadTimeout = time.Second * 10 + RequestTimeout = time.Second * 30 + ReadTimeout = time.Second * 10 ) // Config defines configuration parameters for setup Node. diff --git a/pkg/setup/node.go b/pkg/setup/node.go index 5577b2b2af..f37e0c1c9b 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "sync" "time" "github.com/google/uuid" @@ -78,15 +77,15 @@ func (sn *Node) Serve(ctx context.Context) error { return err } go func(conn *dmsg.Transport) { - if err := sn.serveTransport(ctx, conn); err != nil { + if err := sn.handleRequest(ctx, conn); err != nil { sn.Logger.Warnf("Failed to serve Transport: %s", err) } }(conn) } } -func (sn *Node) serveTransport(ctx context.Context, tr *dmsg.Transport) error { - ctx, cancel := context.WithTimeout(ctx, ServeTransportTimeout) +func (sn *Node) handleRequest(ctx context.Context, tr *dmsg.Transport) error { + ctx, cancel := context.WithTimeout(ctx, RequestTimeout) defer cancel() proto := NewSetupProtocol(tr) @@ -95,36 +94,44 @@ func (sn *Node) serveTransport(ctx context.Context, tr *dmsg.Transport) error { return err } - sn.Logger.Infof("Got new Setup request with type %s: %s", sp, string(data)) - defer sn.Logger.Infof("Completed Setup request with type %s: %s", sp, string(data)) + log := sn.Logger.WithField("requester", tr.RemotePK()).WithField("reqType", sp) + log.Infof("Received request.") startTime := time.Now() + switch sp { case PacketCreateLoop: var ld routing.LoopDescriptor - if err = json.Unmarshal(data, &ld); err == nil { - err = sn.createLoop(ctx, ld) + if err = json.Unmarshal(data, &ld); err != nil { + break } + ldJson, _ := json.MarshalIndent(ld, "", "\t") + log.Infof("CreateLoop loop descriptor: %s", string(ldJson)) + err = sn.createLoop(ctx, ld) + case PacketCloseLoop: var ld routing.LoopData - if err = json.Unmarshal(data, &ld); err == nil { - err = sn.closeLoop(ctx, ld.Loop.Remote.PubKey, routing.LoopData{ - Loop: routing.Loop{ - Remote: ld.Loop.Local, - Local: ld.Loop.Remote, - }, - }) + if err = json.Unmarshal(data, &ld); err != nil { + break } + err = sn.closeLoop(ctx, ld.Loop.Remote.PubKey, routing.LoopData{ + Loop: routing.Loop{ + Remote: ld.Loop.Local, + Local: ld.Loop.Remote, + }, + }) + default: err = errors.New("unknown foundation packet") } sn.metrics.Record(time.Since(startTime), err != nil) if err != nil { - sn.Logger.Infof("Setup request with type %s failed: %s", sp, err) + log.WithError(err).Warnf("Request completed with error.") return proto.WritePacket(RespFailure, err) } + log.Infof("Request completed successfully.") return proto.WritePacket(RespSuccess, nil) } @@ -215,13 +222,12 @@ func (sn *Node) createLoop(ctx context.Context, ld routing.LoopDescriptor) error // // During the setup process each error received along the way causes all the procedure to be canceled. RouteID received // from the 1st step connecting to the initiating node is used as the ID for the overall rule, thus being returned. -func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route routing.Route, - rport, lport routing.Port) (routing.RouteID, error) { +func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route routing.Route, rPort, lPort routing.Port) (routing.RouteID, error) { if len(route) == 0 { return 0, nil } - sn.Logger.Infof("Creating new Route %s", route) + sn.Logger.Infof("Creating a new Route %s", route) // add the initiating node to the start of the route. We need to loop over all the visor nodes // along the route to apply rules including the initiating one @@ -251,7 +257,7 @@ func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route resultingRouteIDCh := make(chan routing.RouteID, 2) // context to cancel rule setup in case of errors - cancellableCtx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) for i := len(r) - 1; i >= 0; i-- { var reqIDChIn, reqIDChOut chan routing.RouteID // goroutine[0] doesn't need to pass the route ID from the 1st step to anyone @@ -268,12 +274,11 @@ func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route nextTpID = r[i+1].Transport rule = routing.ForwardRule(keepAlive, 0, nextTpID, 0) } else { - rule = routing.AppRule(keepAlive, 0, 0, init, lport, rport) + rule = routing.AppRule(keepAlive, 0, 0, init, lPort, rPort) } - go func(i int, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID, - reqIDChOut chan<- routing.RouteID) { - routeID, err := sn.setupRule(cancellableCtx, pk, rule, reqIDChIn, reqIDChOut) + go func(i int, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID, reqIDChOut chan<- routing.RouteID) { + routeID, err := sn.setupRule(ctx, pk, rule, reqIDChIn, reqIDChOut) // adding rule for initiator must result with a route ID for the overall route // it doesn't matter for now if there was an error, this result will be fetched only if there wasn't one if i == 0 { @@ -295,17 +300,16 @@ func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route } var rulesSetupErr error - var cancelOnce sync.Once // check for any errors occurred so far for range r { // filter out context cancellation errors if err := <-rulesSetupErrs; err != nil && err != context.Canceled { // rules setup failed, cancel further setup - cancelOnce.Do(cancel) + cancel() rulesSetupErr = err } } - cancelOnce.Do(cancel) + cancel() // close chan to avoid leaks close(rulesSetupErrs) @@ -349,11 +353,7 @@ func (sn *Node) Close() error { } func (sn *Node) closeLoop(ctx context.Context, on cipher.PubKey, ld routing.LoopData) error { - fmt.Printf(">>> BEGIN: closeLoop(%s, ld)\n", on) - defer fmt.Printf(">>> END: closeLoop(%s, ld)\n", on) - proto, err := sn.dialAndCreateProto(ctx, on) - fmt.Println(">>> *****: closeLoop() dialed:", err) if err != nil { return err } @@ -367,10 +367,9 @@ func (sn *Node) closeLoop(ctx context.Context, on cipher.PubKey, ld routing.Loop return nil } -func (sn *Node) setupRule(ctx context.Context, pk cipher.PubKey, rule routing.Rule, - reqIDChIn <-chan routing.RouteID, reqIDChOut chan<- routing.RouteID) (routing.RouteID, error) { - sn.Logger.Debugf("trying to setup setup rule: %v with %s\n", rule, pk) - requestRouteID, err := sn.requestRouteID(ctx, pk) +func (sn *Node) setupRule(ctx context.Context, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID, reqIDChOut chan<- routing.RouteID) (routing.RouteID, error) { + sn.Logger.Debugf("trying to setup setup rule: %v with %s", rule, pk) + requestRouteID, err := sn.requestRouteID(ctx, pk) // take this. if err != nil { return 0, err } @@ -386,7 +385,7 @@ func (sn *Node) setupRule(ctx context.Context, pk cipher.PubKey, rule routing.Ru rule.SetRequestRouteID(requestRouteID) - sn.Logger.Debugf("dialing to %s to setup rule: %v\n", pk, rule) + sn.Logger.Debugf("dialing to %s to setup rule: %v", pk, rule) if err := sn.addRule(ctx, pk, rule); err != nil { return 0, err diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index a4f7470e1e..a61869fdf1 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -108,6 +108,7 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru mt.connMx.Unlock() }() + // Read loop. go func() { defer func() { mt.log.Infof("closed readPacket loop.") @@ -133,6 +134,7 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru } }() + // Redial loop. for { select { case <-mt.done: @@ -225,7 +227,6 @@ func (mt *ManagedTransport) Dial(ctx context.Context) error { return mt.dial(ctx) } -// TODO: Figure out where this fella is called. func (mt *ManagedTransport) dial(ctx context.Context) error { tp, err := mt.n.Dial(mt.netName, mt.rPK, snet.TransportPort) if err != nil { diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 206aa407fd..ccc246207a 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -116,27 +116,6 @@ func (tm *Manager) serve(ctx context.Context) { } } -// TODO(nkryuchkov): either use or remove if unused -// func (tm *Manager) initTransports(ctx context.Context) { -// tm.mx.Lock() -// defer tm.mx.Unlock() -// -// entries, err := tm.conf.DiscoveryClient.GetTransportsByEdge(ctx, tm.conf.PubKey) -// if err != nil { -// log.Warnf("No transports found for local node: %v", err) -// } -// for _, entry := range entries { -// var ( -// tpType = entry.Entry.Type -// remote = entry.Entry.RemoteEdge(tm.conf.PubKey) -// tpID = entry.Entry.ID -// ) -// if _, err := tm.saveTransport(remote, tpType); err != nil { -// tm.Logger.Warnf("INIT: failed to init tp: type(%s) remote(%s) tpID(%s)", tpType, remote, tpID) -// } -// } -// } - func (tm *Manager) acceptTransport(ctx context.Context, lis *snet.Listener) error { conn, err := lis.AcceptConn() // TODO: tcp panic. if err != nil {