Skip to content

Commit

Permalink
Move the consumer's channel send slightly
Browse files Browse the repository at this point in the history
Prep for unblocking consumers that are not being drained
  • Loading branch information
eapache committed Jul 29, 2015
1 parent 8a91a50 commit 7e4b74b
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,23 +401,30 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
}

func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage

for response := range child.feeder {
child.responseResult = child.handleResponse(response)
msgs, child.responseResult = child.parseResponse(response)

for _, msg := range msgs {
child.messages <- msg
}

child.broker.acks.Done()
}

close(child.messages)
close(child.errors)
}

func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return ErrIncompleteResponse
return nil, ErrIncompleteResponse
}

if block.Err != ErrNoError {
return block.Err
return nil, block.Err
}

if len(block.MsgSet.Messages) == 0 {
Expand All @@ -436,16 +443,16 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
}
}

return nil
return nil, nil
}

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

incomplete := false
atLeastOne := false
prelude := true
var messages []*ConsumerMessage
for _, msgBlock := range block.MsgSet.Messages {

for _, msg := range msgBlock.Messages() {
Expand All @@ -455,14 +462,13 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
prelude = false

if msg.Offset >= child.offset {
atLeastOne = true
child.messages <- &ConsumerMessage{
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
}
})
child.offset = msg.Offset + 1
} else {
incomplete = true
Expand All @@ -471,10 +477,10 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {

}

if incomplete || !atLeastOne {
return ErrIncompleteResponse
if incomplete || len(messages) == 0 {
return nil, ErrIncompleteResponse
}
return nil
return messages, nil
}

// brokerConsumer
Expand Down

0 comments on commit 7e4b74b

Please sign in to comment.