From 26e032c678b695c72a767f9d2cebe645486779ab Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 15 Jul 2015 13:07:13 -0400 Subject: [PATCH] Move the consumer's channel send slightly Prep for unblocking consumers that are not being drained --- consumer.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/consumer.go b/consumer.go index 8a3ca6f02..7fcc49ac5 100644 --- a/consumer.go +++ b/consumer.go @@ -402,9 +402,11 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { for response := range child.feeder { - switch err := child.handleResponse(response); err { + switch msgs, err := child.parseResponse(response); err { case nil: - break + for _, msg := range msgs { + child.messages <- msg + } case ErrOffsetOutOfRange: // there's no point in retrying this it will just fail the same way again // so shut it down and force the user to choose what to do @@ -427,14 +429,14 @@ func (child *partitionConsumer) responseFeeder() { close(child.errors) } -func (child *partitionConsumer) handleResponse(response *FetchResponse) error { +func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) { block := response.GetBlock(child.topic, child.partition) if block == nil { - return ErrIncompleteResponse + return nil, ErrIncompleteResponse } if block.Err != ErrNoError { - return block.Err + return nil, block.Err } if len(block.MsgSet.Messages) == 0 { @@ -453,7 +455,7 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { } } - return nil + return nil, nil } // we got messages, reset our fetch size in case it was increased for a previous request @@ -461,8 +463,8 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) incomplete := false - atLeastOne := false prelude := true + var messages []*ConsumerMessage for _, msgBlock := range block.MsgSet.Messages { for _, msg := range msgBlock.Messages() { @@ -472,14 +474,13 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { prelude = false if msg.Offset >= child.offset { - atLeastOne = true - child.messages <- &ConsumerMessage{ + messages = append(messages, &ConsumerMessage{ Topic: child.topic, Partition: child.partition, Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, - } + }) child.offset = msg.Offset + 1 } else { incomplete = true @@ -488,10 +489,10 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { } - if incomplete || !atLeastOne { - return ErrIncompleteResponse + if incomplete || len(messages) == 0 { + return nil, ErrIncompleteResponse } - return nil + return messages, nil } // brokerConsumer