diff --git a/async_producer.go b/async_producer.go index 8e229490f8..92f9f53fd3 100644 --- a/async_producer.go +++ b/async_producer.go @@ -103,6 +103,7 @@ type flagSet int8 const ( chaser flagSet = 1 << iota // message is last in a group that failed shutdown // start the shutdown process + returned // returned mid-processing, so skip it (hacky fix for #449) ) // ProducerMessage is the collection of elements passed to the Producer in order to send a message. @@ -662,12 +663,12 @@ func (f *flusher) run() { func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage { msgSets := make(map[string]map[int32][]*ProducerMessage) - for i, msg := range batch { + for _, msg := range batch { if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { // we're currently retrying this partition so we need to filter out this message f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) - batch[i] = nil + msg.flags |= returned if msg.flags&chaser == chaser { // ...but now we can start processing future messages again @@ -791,12 +792,14 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa if msg.Key != nil { if keyBytes, err = msg.Key.Encode(); err != nil { p.returnError(msg, err) + msg.flags |= returned continue } } if msg.Value != nil { if valBytes, err = msg.Value.Encode(); err != nil { p.returnError(msg, err) + msg.flags |= returned continue } } @@ -851,7 +854,7 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { for _, msg := range batch { - if msg != nil { + if msg.flags&returned == 0 { p.returnError(msg, err) } } @@ -859,7 +862,7 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { for _, msg := range batch { - if msg == nil { + if msg.flags&returned == returned { continue } if p.conf.Producer.Return.Successes {