Skip to content

Commit

Permalink
Atomic fields int32 isClosing changed into isClosing() method in
Browse files Browse the repository at this point in the history
Manager and ManagedTransport
  • Loading branch information
ayuryshev committed Jun 5, 2019
1 parent 2ba8af0 commit 76ed236
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
23 changes: 13 additions & 10 deletions pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package transport
import (
"math/big"
"sync"
"sync/atomic"

"github.com/google/uuid"
)
Expand All @@ -17,11 +16,10 @@ type ManagedTransport struct {
Accepted bool
LogEntry *LogEntry

doneChan chan struct{}
errChan chan error
isClosing int32
mu sync.RWMutex
once sync.Once
doneChan chan struct{}
errChan chan error
mu sync.RWMutex
once sync.Once

readLogChan chan int
writeLogChan chan int
Expand Down Expand Up @@ -80,18 +78,23 @@ func (tr *ManagedTransport) killWorker() {

// Close closes underlying
func (tr *ManagedTransport) Close() error {

atomic.StoreInt32(&tr.isClosing, 1)

tr.mu.RLock()
err := tr.Transport.Close()
tr.mu.RUnlock()

tr.killWorker()

return err
}

func (tr *ManagedTransport) isClosing() bool {
select {
case <-tr.doneChan:
return true
default:
return false
}
}

func (tr *ManagedTransport) updateTransport(newTr Transport) {
tr.mu.Lock()
tr.Transport = newTr
Expand Down
22 changes: 14 additions & 8 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ type Manager struct {
transports map[uuid.UUID]*ManagedTransport
entries map[Entry]struct{}

doneChan chan struct{}
isClosing int32
TrChan chan *ManagedTransport
mu sync.RWMutex
doneChan chan struct{}
TrChan chan *ManagedTransport
mu sync.RWMutex

mgrQty int32 // Count of spawned manageTransport goroutines
}
Expand Down Expand Up @@ -262,8 +261,6 @@ func (tm *Manager) DeleteTransport(id uuid.UUID) error {

// Close closes opened transports and registered factories.
func (tm *Manager) Close() error {

atomic.StoreInt32(&tm.isClosing, 1)
close(tm.doneChan)

tm.Logger.Info("Closing transport manager")
Expand Down Expand Up @@ -342,7 +339,7 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag
return nil, err
}

if atomic.LoadInt32(&tm.isClosing) != 0 {
if tm.isClosing() {
return nil, errors.New("transport.Manager is closing. Skipping incoming transport")
}

Expand Down Expand Up @@ -396,6 +393,15 @@ func (tm *Manager) addEntry(entry *Entry) {
tm.mu.Unlock()
}

func (tm *Manager) isClosing() bool {
select {
case <-tm.doneChan:
return true
default:
return false
}
}

func (tm *Manager) manageTransport(ctx context.Context, mTr *ManagedTransport, factory Factory, remote cipher.PubKey, public bool, accepted bool) {
mgrQty := atomic.AddInt32(&tm.mgrQty, 1)
tm.Logger.Infof("Spawned manageTransport for mTr.ID: %v. mgrQty: %v", mTr.ID, mgrQty)
Expand All @@ -406,7 +412,7 @@ func (tm *Manager) manageTransport(ctx context.Context, mTr *ManagedTransport, f
tm.Logger.Infof("manageTransport exit for %v. mgrQty: %v", mTr.ID, mgrQty)
return
case err := <-mTr.errChan:
if atomic.LoadInt32(&mTr.isClosing) == 0 {
if !mTr.isClosing() {
tm.Logger.Infof("Transport %s failed with error: %s. Re-dialing...", mTr.ID, err)
if accepted {
if err := tm.DeleteTransport(mTr.ID); err != nil {
Expand Down

0 comments on commit 76ed236

Please sign in to comment.