diff --git a/pkg/node/node.go b/pkg/node/node.go index 05f9e5a1d0..58e6bae6d8 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -71,7 +71,7 @@ type PacketRouter interface { io.Closer Serve(ctx context.Context) error ServeApp(conn net.Conn, port uint16, appConf *app.Config) error - IsSetupTransport(tr transport.Transport) bool + IsSetupTransport(tr *transport.ManagedTransport) bool } // Node provides messaging runtime for Apps by setting up all diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index f9248631fa..79f1a94cb9 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -254,6 +254,6 @@ func (r *mockRouter) Close() error { return nil } -func (r *mockRouter) IsSetupTransport(tr transport.Transport) bool { +func (r *mockRouter) IsSetupTransport(tr *transport.ManagedTransport) bool { return false } diff --git a/pkg/router/router.go b/pkg/router/router.go index 7a4c78ed28..9390cbe106 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -82,7 +82,7 @@ func (r *Router) Serve(ctx context.Context) error { go func() { for tr := range r.tm.TrChan { if tr.Accepted { - go func(t transport.Transport) { + go func(t *transport.ManagedTransport) { for { var err error if r.IsSetupTransport(t) { @@ -100,7 +100,7 @@ func (r *Router) Serve(ctx context.Context) error { } }(tr) } else { - go func(t transport.Transport) { + go func(t *transport.ManagedTransport) { for { if err := r.serveTransport(t); err != nil { if err != io.EOF { @@ -178,7 +178,7 @@ func (r *Router) Close() error { return r.tm.Close() } -func (r *Router) serveTransport(tr transport.Transport) error { +func (r *Router) serveTransport(tr *transport.ManagedTransport) error { packet := make(routing.Packet, 6) if _, err := io.ReadFull(tr, packet); err != nil { return err @@ -476,7 +476,7 @@ func (r *Router) advanceNoiseHandshake(addr *app.LoopAddr, noiseMsg []byte) (ni } // IsSetupTransport checks whether `tr` is running in the `setup` mode. -func (r *Router) IsSetupTransport(tr transport.Transport) bool { +func (r *Router) IsSetupTransport(tr *transport.ManagedTransport) bool { for _, pk := range r.config.SetupNodes { remote, ok := r.tm.Remote(tr.Edges()) if ok && (remote == pk) { diff --git a/pkg/setup/node.go b/pkg/setup/node.go index 5f1a56ac57..d95d6759cf 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -106,24 +106,6 @@ func (sn *Node) Serve(ctx context.Context) error { } }() - // go func() { - // for tr := range sn.tm.AcceptedTrChan { - // go func(t transport.Transport) { - // for { - // if err := sn.serveTransport(t); err != nil { - // sn.Logger.Warnf("Failed to serve Transport: %s", err) - // return - // } - // } - // }(tr) - // } - // }() - - // go func() { - // for range sn.tm.DialedTrChan { - // } - // }() - sn.Logger.Info("Starting Setup Node") return sn.tm.Serve(ctx) } diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index b5b3e67f21..cd142e023b 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -33,10 +33,8 @@ type Manager struct { entries map[Entry]struct{} doneChan chan struct{} - // AcceptedTrChan chan *ManagedTransport - // DialedTrChan chan *ManagedTransport - TrChan chan *ManagedTransport - mu sync.RWMutex + TrChan chan *ManagedTransport + mu sync.RWMutex } // NewManager creates a Manager with the provided configuration and transport factories. @@ -67,12 +65,6 @@ func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) { }, nil } -// // Observe returns channel for notifications about new Transport -// // registration. Only single observer is supported. -// func (tm *Manager) Observe() (accept <-chan *ManagedTransport, dial <-chan *ManagedTransport) { -// return tm.AcceptedTrChan, tm.DialedTrChan -// } - // Factories returns all the factory types contained within the TransportManager. func (tm *Manager) Factories() []string { fTypes, i := make([]string, len(tm.factories)), 0 @@ -383,7 +375,6 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag } tm.mu.Unlock() - // go func(managedTr *ManagedTransport, tm *Manager) { go func() { select { case <-managedTr.doneChan: