diff --git a/async_producer.go b/async_producer.go index 5c23ac775..61746c8fd 100644 --- a/async_producer.go +++ b/async_producer.go @@ -613,6 +613,18 @@ func (pp *partitionProducer) backoff(retries int) { } } +func (pp *partitionProducer) updateLeaderIfBrokerProducerIsNil(msg *ProducerMessage) error { + if pp.brokerProducer == nil { + if err := pp.updateLeader(); err != nil { + pp.parent.returnError(msg, err) + pp.backoff(msg.retries) + return err + } + Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) + } + return nil +} + func (pp *partitionProducer) dispatch() { // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader` // on the first message @@ -644,6 +656,9 @@ func (pp *partitionProducer) dispatch() { } if msg.retries > pp.highWatermark { + if err := pp.updateLeaderIfBrokerProducerIsNil(msg); err != nil { + continue + } // a new, higher, retry level; handle it and then back off pp.newHighWatermark(msg.retries) pp.backoff(msg.retries) @@ -670,14 +685,8 @@ func (pp *partitionProducer) dispatch() { // if we made it this far then the current msg contains real data, and can be sent to the next goroutine // without breaking any of our ordering guarantees - - if pp.brokerProducer == nil { - if err := pp.updateLeader(); err != nil { - pp.parent.returnError(msg, err) - pp.backoff(msg.retries) - continue - } - Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) + if err := pp.updateLeaderIfBrokerProducerIsNil(msg); err != nil { + continue } // Now that we know we have a broker to actually try and send this message to, generate the sequence