Skip to content

Commit

Permalink
Merge pull request #45 from mreiferson/close_race_45
Browse files Browse the repository at this point in the history
Panic via nsq.Producer
  • Loading branch information
jehiah committed Jun 15, 2014
2 parents 811ab9f + 1cd37f5 commit 394ebba
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Producer struct {
stopFlag int32
exitChan chan int
wg sync.WaitGroup
guard sync.Mutex
}

// ProducerTransaction is returned by the async publish methods
Expand Down Expand Up @@ -86,13 +87,17 @@ func (w *Producer) String() string {

// Stop initiates a graceful stop of the Producer (permanent)
//
// NOTE: receive on StopChan to block until this process completes
// NOTE: this blocks until completion
func (w *Producer) Stop() {
w.guard.Lock()
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
w.guard.Unlock()
return
}
w.log(LogLevelInfo, "stopping")
close(w.exitChan)
w.close()
w.guard.Unlock()
w.wg.Wait()
}

Expand Down Expand Up @@ -181,6 +186,9 @@ func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransac
}

func (w *Producer) connect() error {
w.guard.Lock()
defer w.guard.Unlock()

if atomic.LoadInt32(&w.stopFlag) == 1 {
return ErrStopped
}
Expand Down

0 comments on commit 394ebba

Please sign in to comment.