Skip to content

Commit

Permalink
Quick hacky fix for #449
Browse files Browse the repository at this point in the history
Pending a proper refactor and tests etc.
  • Loading branch information
eapache committed Sep 24, 2015
1 parent 4faee61 commit ce5f6fe
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type flagSet int8
const (
chaser flagSet = 1 << iota // message is last in a group that failed
shutdown // start the shutdown process
returned // returned mid-processing, so skip it (hacky fix for #449)
)

// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
Expand Down Expand Up @@ -662,12 +663,12 @@ func (f *flusher) run() {
func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
msgSets := make(map[string]map[int32][]*ProducerMessage)

for i, msg := range batch {
for _, msg := range batch {

if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
// we're currently retrying this partition so we need to filter out this message
f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
batch[i] = nil
msg.flags |= returned

if msg.flags&chaser == chaser {
// ...but now we can start processing future messages again
Expand Down Expand Up @@ -791,12 +792,14 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa
if msg.Key != nil {
if keyBytes, err = msg.Key.Encode(); err != nil {
p.returnError(msg, err)
msg.flags |= returned
continue
}
}
if msg.Value != nil {
if valBytes, err = msg.Value.Encode(); err != nil {
p.returnError(msg, err)
msg.flags |= returned
continue
}
}
Expand Down Expand Up @@ -851,15 +854,15 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {

func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
for _, msg := range batch {
if msg != nil {
if msg.flags&returned == 0 {
p.returnError(msg, err)
}
}
}

func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
for _, msg := range batch {
if msg == nil {
if msg.flags&returned == returned {
continue
}
if p.conf.Producer.Return.Successes {
Expand Down

0 comments on commit ce5f6fe

Please sign in to comment.