diff --git a/pkg/router/router.go b/pkg/router/router.go index e282e2ae58..6f1d6d849a 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -256,6 +256,7 @@ func (r *Router) consumePacket(payload []byte, rule routing.Rule) error { } func (r *Router) forwardAppPacket(appConn *app.Protocol, packet *app.Packet) error { + fmt.Println(">>> PREPARING TO FORWARD APP PACKET!") if packet.Loop.Remote.PubKey == r.config.PubKey { return r.forwardLocalAppPacket(packet) } diff --git a/pkg/router/visor.go b/pkg/router/visor.go index 8adf03afcd..bd1cf10224 100644 --- a/pkg/router/visor.go +++ b/pkg/router/visor.go @@ -68,7 +68,7 @@ func (am *appManager) initApp(payload []byte) error { return errors.New("unexpected app version") } - am.Logger.Infof("Handshaked new connection with the app %s.v%s", config.AppName, config.AppVersion) + am.Logger.Infof("Finished initiating app: %s.v%s", config.AppName, config.AppVersion) return nil } diff --git a/pkg/setup/node.go b/pkg/setup/node.go index ee6609cd6f..431aa80204 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -82,6 +82,7 @@ func (sn *Node) serveTransport(tr transport.Transport) error { } sn.Logger.Infof("Got new Setup request with type %s: %s", sp, string(data)) + defer sn.Logger.Infof("Completed Setup request with type %s: %s", sp, string(data)) startTime := time.Now() switch sp { @@ -256,9 +257,12 @@ func (sn *Node) remote(edges [2]cipher.PubKey) (cipher.PubKey, bool) { } func (sn *Node) closeLoop(on cipher.PubKey, ld routing.LoopData) error { + fmt.Printf(">>> BEGIN: closeLoop(%s, ld)\n", on) + defer fmt.Printf(">>> END: closeLoop(%s, ld)\n", on) ctx := context.Background() tr, err := sn.messenger.Dial(ctx, on) + fmt.Println(">>> *****: closeLoop() dialed:", err) if err != nil { return fmt.Errorf("transport: %s", err) } diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 0858b2d28b..ef0adc702a 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -3,6 +3,7 @@ package transport import ( "context" "errors" + "fmt" "io" "strings" "sync" @@ -61,7 +62,7 @@ func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) { } return &Manager{ - Logger: logging.MustGetLogger("trmanager"), + Logger: logging.MustGetLogger("tp_manager"), config: config, factories: fMap, transports: make(map[uuid.UUID]*ManagedTransport), @@ -118,6 +119,8 @@ func (tm *Manager) WalkTransports(walk func(tp *ManagedTransport) bool) { // reconnectTransports tries to reconnect previously established transports. func (tm *Manager) reconnectTransports(ctx context.Context) { + defer tm.Logger.Println("Finished reconnecting transports.") + tm.mu.RLock() entries := make(map[Entry]struct{}) for tmEntry := range tm.entries { @@ -135,12 +138,15 @@ func (tm *Manager) reconnectTransports(ctx context.Context) { continue } + if !tm.IsSetupPK(remote) { + continue + } + _, err := tm.CreateDataTransport(ctx, remote, entry.Type, entry.Public) if err != nil { tm.Logger.Warnf("Failed to re-establish transport: %s", err) continue } - if _, err := tm.config.DiscoveryClient.UpdateStatuses(ctx, &Status{ID: entry.ID, IsUp: true}); err != nil { tm.Logger.Warnf("Failed to change transport status: %s", err) } @@ -200,26 +206,22 @@ func (tm *Manager) Serve(ctx context.Context) error { for { select { case <-ctx.Done(): - tm.Logger.Info("Received ctx.Done()") return case <-tm.doneChan: - tm.Logger.Info("Received tm.doneCh") return default: if _, err := tm.acceptTransport(ctx, f); err != nil { if strings.Contains(err.Error(), "closed") { return } - tm.Logger.Warnf("Failed to accept connection: %s", err) } } - } }(factory) } - tm.Logger.Info("Starting transport manager") + tm.Logger.Info("TransportManager is serving.") wg.Wait() return nil } @@ -343,7 +345,9 @@ func (tm *Manager) dialTransport(ctx context.Context, factory Factory, remote ci return nil, nil, err } + fmt.Printf(">>> INITIATING SETTLEMENT HANDSHAKE: local(%s) remote(%s)\n", tm.config.PubKey, remote) entry, err := settlementInitiatorHandshake(public).Do(tm, tr, time.Minute) + fmt.Printf("<<< COMPLETED SETTLEMENT HANDSHAKE: local(%s) remote(%s) error: %v\n", tm.config.PubKey, remote, err) if err != nil { go func() { if err := tr.Close(); err != nil { @@ -371,15 +375,19 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (Transp return nil, errors.New("failed to determine remote edge of accepted transport") } - if isSetup := tm.IsSetupPK(remotePK); isSetup { + fmt.Printf("{tp.Manager} Factory found tp: local(%s) remote(%s)\n", tm.config.PubKey, remotePK) + + if tm.IsSetupPK(remotePK) { select { case <-tm.doneChan: return nil, io.ErrClosedPipe - case tm.SetupTpChan <- tr: + default: + tm.SetupTpChan <- tr return tr, nil } } + // For transports for purpose(data)... entry, err := settlementResponderHandshake().Do(tm, tr, 30*time.Second) if err != nil { go func() {