Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure we always have called Add() on the inflight counter before we Wait() for it #450

Merged
merged 1 commit into from
May 18, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func (p *asyncProducer) topicDispatcher() {

if msg.flags&shutdown != 0 {
shuttingDown = true
p.inFlight.Done()
continue
} else if msg.retries == 0 {
p.inFlight.Add(1)
Expand Down Expand Up @@ -256,7 +257,7 @@ func (p *asyncProducer) topicDispatcher() {

// one per topic
// partitions messages, then dispatches them by partition
func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *ProducerMessage) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these channel restrictions are a good idea though

handlers := make(map[int32]chan *ProducerMessage)
partitioner := p.conf.Producer.Partitioner(topic)
breaker := breaker.New(3, 1, 10*time.Second)
Expand Down Expand Up @@ -293,7 +294,7 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe
// one per partition per topic
// dispatches messages to the appropriate broker
// also responsible for maintaining message order during retries
func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input <-chan *ProducerMessage) {
var leader *Broker
var output chan *ProducerMessage

Expand Down Expand Up @@ -413,7 +414,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
// one per broker
// groups messages together into appropriately-sized batches for sending to the broker
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *ProducerMessage) {
var (
timer <-chan time.Time
buffer []*ProducerMessage
Expand Down Expand Up @@ -477,7 +478,7 @@ shutdown:

// one per broker
// takes a batch at a time from the messageAggregator and sends to the broker
func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) {
var closing error
currentRetries := make(map[string]map[int32]error)
Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
Expand Down Expand Up @@ -610,6 +611,7 @@ func (p *asyncProducer) retryHandler() {

func (p *asyncProducer) shutdown() {
Logger.Println("Producer shutting down.")
p.inFlight.Add(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary? If you create a wait group and then immediately Wait on it (without ever calling Add) it just returns right away...

p.input <- &ProducerMessage{flags: shutdown}

p.inFlight.Wait()
Expand Down