Skip to content

Commit

Permalink
Set the flag *before* we retry the message
Browse files Browse the repository at this point in the history
Go correctly caught this as a race condition.
  • Loading branch information
eapache committed Sep 24, 2015
1 parent ce5f6fe commit f15799e
Showing 1 changed file with 1 addition and 1 deletion.
2 changes: 1 addition & 1 deletion async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,8 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32]

if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
// we're currently retrying this partition so we need to filter out this message
f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
msg.flags |= returned
f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])

if msg.flags&chaser == chaser {
// ...but now we can start processing future messages again
Expand Down

0 comments on commit f15799e

Please sign in to comment.