From 292f3b0aa1d7adfb715ebef42f7f95a1c8748a5f Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 15 Jul 2015 14:44:18 -0400 Subject: [PATCH] consumer: don't block on undrained partitions If a partitionConsumer fills up and is not being drained (or is taking a long time) remove its subscription until it can proceed again in order to not block other partitions which may still be making progress. --- config.go | 10 +++++++++- consumer.go | 39 +++++++++++++++++++++++++++++---------- consumer_test.go | 1 + 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/config.go b/config.go index 264dfa0af..653db45f0 100644 --- a/config.go +++ b/config.go @@ -105,6 +105,11 @@ type Config struct { // Equivalent to the JVM's `fetch.wait.max.ms`. MaxWaitTime time.Duration + // The maximum amount of time the consumer expects a message takes to process for the user. If writing to the Messages channel + // takes longer than this, that partition will stop fetching more messages until it can proceed again. Note that, since the + // Messages channel is buffered, the actual grace time is (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms. + MaxProcessingTime time.Duration + // Return specifies what channels will be populated. If they are set to true, you must read from // them to prevent deadlock. Return struct { @@ -147,6 +152,7 @@ func NewConfig() *Config { c.Consumer.Fetch.Default = 32768 c.Consumer.Retry.Backoff = 2 * time.Second c.Consumer.MaxWaitTime = 250 * time.Millisecond + c.Consumer.MaxProcessingTime = 100 * time.Millisecond c.Consumer.Return.Errors = false c.ChannelBufferSize = 256 @@ -239,7 +245,9 @@ func (c *Config) Validate() error { case c.Consumer.Fetch.Max < 0: return ConfigurationError("Consumer.Fetch.Max must be >= 0") case c.Consumer.MaxWaitTime < 1*time.Millisecond: - return ConfigurationError("Consumer.MaxWaitTime must be > 1ms") + return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms") + case c.Consumer.MaxProcessingTime <= 0: + return ConfigurationError("Consumer.MaxProcessingTime must be > 0") case c.Consumer.Retry.Backoff < 0: return ConfigurationError("Consumer.Retry.Backoff must be >= 0") } diff --git a/consumer.go b/consumer.go index 6c7eaf4da..43ce3b21b 100644 --- a/consumer.go +++ b/consumer.go @@ -1,6 +1,7 @@ package sarama import ( + "errors" "fmt" "sync" "sync/atomic" @@ -278,6 +279,8 @@ type partitionConsumer struct { highWaterMarkOffset int64 } +var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing + func (child *partitionConsumer) sendError(err error) { cErr := &ConsumerError{ Topic: child.topic, @@ -403,11 +406,22 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { var msgs []*ConsumerMessage +feederLoop: for response := range child.feeder { msgs, child.responseResult = child.parseResponse(response) - for _, msg := range msgs { - child.messages <- msg + for i, msg := range msgs { + select { + case child.messages <- msg: + case <-time.After(child.conf.Consumer.MaxProcessingTime): + child.responseResult = errTimedOut + child.broker.acks.Done() + for _, msg = range msgs[i:] { + child.messages <- msg + } + child.broker.input <- child + continue feederLoop + } } child.broker.acks.Done() @@ -596,32 +610,37 @@ func (bc *brokerConsumer) handleResponses() { close(child.trigger) delete(bc.subscriptions, child) default: - switch child.responseResult { + result := child.responseResult + child.responseResult = nil + + switch result { case nil: break + case errTimedOut: + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", + bc.broker.ID(), child.topic, child.partition) + delete(bc.subscriptions, child) case ErrOffsetOutOfRange: // there's no point in retrying this it will just fail the same way again // shut it down and force the user to choose what to do - child.sendError(child.responseResult) - Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, child.responseResult) + child.sendError(result) + Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result) close(child.trigger) delete(bc.subscriptions, child) case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: // not an error, but does need redispatching Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, child.responseResult) + bc.broker.ID(), child.topic, child.partition, result) child.trigger <- none{} delete(bc.subscriptions, child) default: // dunno, tell the user and try redispatching - child.sendError(child.responseResult) + child.sendError(result) Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, child.responseResult) + bc.broker.ID(), child.topic, child.partition, result) child.trigger <- none{} delete(bc.subscriptions, child) } - - child.responseResult = nil } } } diff --git a/consumer_test.go b/consumer_test.go index cad709b53..6617f149e 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -390,6 +390,7 @@ func TestConsumerInterleavedClose(t *testing.T) { fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1)) fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) leader.Returns(fetchResponse) + leader.Returns(fetchResponse) safeClose(t, c1) safeClose(t, c0)