diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 17181b3ac9..d025e5f18e 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -45,22 +45,12 @@ func (tr *ManagedTransport) Read(p []byte) (n int, err error) { tr.mu.RLock() n, err = tr.Transport.Read(p) // TODO: data race. tr.mu.RUnlock() - if err == nil { - select { - case <-tr.doneChan: - return - case tr.readLogChan <- n: - } - return - } - - select { - case <-tr.doneChan: - return - case tr.errChan <- err: + if err != nil { + tr.errChan <- err } + tr.readLogChan <- n return } @@ -69,23 +59,23 @@ func (tr *ManagedTransport) Write(p []byte) (n int, err error) { tr.mu.RLock() n, err = tr.Transport.Write(p) tr.mu.RUnlock() - if err == nil { - select { - case <-tr.doneChan: - return - case tr.writeLogChan <- n: - } + if err != nil { + tr.errChan <- err return } + tr.writeLogChan <- n + return +} + +func (tr *ManagedTransport) killWorker() { select { case <-tr.doneChan: return - case tr.errChan <- err: + default: + close(tr.doneChan) } - - return } // Close closes underlying @@ -97,12 +87,7 @@ func (tr *ManagedTransport) Close() error { err := tr.Transport.Close() tr.mu.RUnlock() - select { - case <-tr.doneChan: - default: - - close(tr.doneChan) - } + tr.killWorker() return err } diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 0763632e91..e00127e466 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -37,6 +37,8 @@ type Manager struct { isClosing int32 TrChan chan *ManagedTransport mu sync.RWMutex + + mgrQty int32 // Count of spawned manageTransport goroutines } // NewManager creates a Manager with the provided configuration and transport factories. @@ -243,6 +245,10 @@ func (tm *Manager) DeleteTransport(id uuid.UUID) error { tr := tm.transports[id] delete(tm.transports, id) tm.mu.Unlock() + mgrQty := atomic.AddInt32(&tm.mgrQty, -1) + tm.Logger.Infof("Manager.DeleteTransport id: %v, mgrQty = %v", id, mgrQty) + + tr.Close() if _, err := tm.config.DiscoveryClient.UpdateStatuses(context.Background(), &Status{ID: id, IsUp: false}); err != nil { tm.Logger.Warnf("Failed to change transport status: %s", err) @@ -313,6 +319,11 @@ func (tm *Manager) createTransport(ctx context.Context, remote cipher.PubKey, tp return nil, err } + oldTr := tm.Transport(entry.ID) + if oldTr != nil { + oldTr.killWorker() + } + tm.Logger.Infof("Dialed to %s using %s factory. Transport ID: %s", remote, tpType, entry.ID) mTr := newManagedTransport(entry.ID, tr, entry.Public, false) @@ -350,6 +361,11 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag } tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), remote, entry.ID) + + oldTr := tm.Transport(entry.ID) + if oldTr != nil { + oldTr.killWorker() + } mTr := newManagedTransport(entry.ID, tr, entry.Public, true) tm.mu.Lock() @@ -383,10 +399,13 @@ func (tm *Manager) addEntry(entry *Entry) { } func (tm *Manager) manageTransport(ctx context.Context, mTr *ManagedTransport, factory Factory, remote cipher.PubKey, public bool, accepted bool) { + mgrQty := atomic.AddInt32(&tm.mgrQty, 1) + tm.Logger.Infof("Spawned manageTransport for mTr.ID: %v. mgrQty: %v", mTr.ID, mgrQty) for { select { case <-mTr.doneChan: - tm.Logger.Infof("Transport %s closed", mTr.ID) + mgrQty := atomic.AddInt32(&tm.mgrQty, -1) + tm.Logger.Infof("manageTransport exit for %v. mgrQty: %v", mTr.ID, mgrQty) return case err := <-mTr.errChan: if atomic.LoadInt32(&mTr.isClosing) == 0 {