Skip to content

Commit

Permalink
Move the consumer's channel send slightly
Browse files Browse the repository at this point in the history
Prep for unblocking consumers that are not being drained
  • Loading branch information
eapache committed Jul 15, 2015
1 parent 4a7c496 commit 26e032c
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,11 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {

func (child *partitionConsumer) responseFeeder() {
for response := range child.feeder {
switch err := child.handleResponse(response); err {
switch msgs, err := child.parseResponse(response); err {
case nil:
break
for _, msg := range msgs {
child.messages <- msg
}
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// so shut it down and force the user to choose what to do
Expand All @@ -427,14 +429,14 @@ func (child *partitionConsumer) responseFeeder() {
close(child.errors)
}

func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return ErrIncompleteResponse
return nil, ErrIncompleteResponse
}

if block.Err != ErrNoError {
return block.Err
return nil, block.Err
}

if len(block.MsgSet.Messages) == 0 {
Expand All @@ -453,16 +455,16 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
}
}

return nil
return nil, nil
}

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

incomplete := false
atLeastOne := false
prelude := true
var messages []*ConsumerMessage
for _, msgBlock := range block.MsgSet.Messages {

for _, msg := range msgBlock.Messages() {
Expand All @@ -472,14 +474,13 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
prelude = false

if msg.Offset >= child.offset {
atLeastOne = true
child.messages <- &ConsumerMessage{
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
}
})
child.offset = msg.Offset + 1
} else {
incomplete = true
Expand All @@ -488,10 +489,10 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {

}

if incomplete || !atLeastOne {
return ErrIncompleteResponse
if incomplete || len(messages) == 0 {
return nil, ErrIncompleteResponse
}
return nil
return messages, nil
}

// brokerConsumer
Expand Down

0 comments on commit 26e032c

Please sign in to comment.