Skip to content

Commit

Permalink
Implemenented ManagedTransport.killWorker. No multiple transports
Browse files Browse the repository at this point in the history
  • Loading branch information
ayuryshev committed Jun 4, 2019
1 parent 3be2313 commit 7c7f644
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 29 deletions.
41 changes: 13 additions & 28 deletions pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,12 @@ func (tr *ManagedTransport) Read(p []byte) (n int, err error) {
tr.mu.RLock()
n, err = tr.Transport.Read(p) // TODO: data race.
tr.mu.RUnlock()
if err == nil {
select {
case <-tr.doneChan:
return
case tr.readLogChan <- n:
}

return
}

select {
case <-tr.doneChan:
return
case tr.errChan <- err:
if err != nil {
tr.errChan <- err
}

tr.readLogChan <- n
return
}

Expand All @@ -69,23 +59,23 @@ func (tr *ManagedTransport) Write(p []byte) (n int, err error) {
tr.mu.RLock()
n, err = tr.Transport.Write(p)
tr.mu.RUnlock()
if err == nil {
select {
case <-tr.doneChan:
return
case tr.writeLogChan <- n:
}

if err != nil {
tr.errChan <- err
return
}
tr.writeLogChan <- n

return
}

func (tr *ManagedTransport) killWorker() {
select {
case <-tr.doneChan:
return
case tr.errChan <- err:
default:
close(tr.doneChan)
}

return
}

// Close closes underlying
Expand All @@ -97,12 +87,7 @@ func (tr *ManagedTransport) Close() error {
err := tr.Transport.Close()
tr.mu.RUnlock()

select {
case <-tr.doneChan:
default:

close(tr.doneChan)
}
tr.killWorker()

return err
}
Expand Down
21 changes: 20 additions & 1 deletion pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Manager struct {
isClosing int32
TrChan chan *ManagedTransport
mu sync.RWMutex

mgrQty int32 // Count of spawned manageTransport goroutines
}

// NewManager creates a Manager with the provided configuration and transport factories.
Expand Down Expand Up @@ -243,6 +245,10 @@ func (tm *Manager) DeleteTransport(id uuid.UUID) error {
tr := tm.transports[id]
delete(tm.transports, id)
tm.mu.Unlock()
mgrQty := atomic.AddInt32(&tm.mgrQty, -1)
tm.Logger.Infof("Manager.DeleteTransport id: %v, mgrQty = %v", id, mgrQty)

tr.Close()

if _, err := tm.config.DiscoveryClient.UpdateStatuses(context.Background(), &Status{ID: id, IsUp: false}); err != nil {
tm.Logger.Warnf("Failed to change transport status: %s", err)
Expand Down Expand Up @@ -313,6 +319,11 @@ func (tm *Manager) createTransport(ctx context.Context, remote cipher.PubKey, tp
return nil, err
}

oldTr := tm.Transport(entry.ID)
if oldTr != nil {
oldTr.killWorker()
}

tm.Logger.Infof("Dialed to %s using %s factory. Transport ID: %s", remote, tpType, entry.ID)
mTr := newManagedTransport(entry.ID, tr, entry.Public, false)

Expand Down Expand Up @@ -350,6 +361,11 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag
}

tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), remote, entry.ID)

oldTr := tm.Transport(entry.ID)
if oldTr != nil {
oldTr.killWorker()
}
mTr := newManagedTransport(entry.ID, tr, entry.Public, true)

tm.mu.Lock()
Expand Down Expand Up @@ -383,10 +399,13 @@ func (tm *Manager) addEntry(entry *Entry) {
}

func (tm *Manager) manageTransport(ctx context.Context, mTr *ManagedTransport, factory Factory, remote cipher.PubKey, public bool, accepted bool) {
mgrQty := atomic.AddInt32(&tm.mgrQty, 1)
tm.Logger.Infof("Spawned manageTransport for mTr.ID: %v. mgrQty: %v", mTr.ID, mgrQty)
for {
select {
case <-mTr.doneChan:
tm.Logger.Infof("Transport %s closed", mTr.ID)
mgrQty := atomic.AddInt32(&tm.mgrQty, -1)
tm.Logger.Infof("manageTransport exit for %v. mgrQty: %v", mTr.ID, mgrQty)
return
case err := <-mTr.errChan:
if atomic.LoadInt32(&mTr.isClosing) == 0 {
Expand Down

0 comments on commit 7c7f644

Please sign in to comment.