diff --git a/config.go b/config.go index 881d630a2..916f1d957 100644 --- a/config.go +++ b/config.go @@ -105,6 +105,11 @@ type Config struct { // Equivalent to the JVM's `fetch.wait.max.ms`. MaxWaitTime time.Duration + // The maximum amount of time the consumer expects a message takes to process for the user. If writing to the Messages channel + // takes longer than this, that partition will stop fetching more messages until it can proceed again. Note that, since the + // Messages channel is buffered, the actual grace time is (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms. + MaxProcessingTime time.Duration + // Return specifies what channels will be populated. If they are set to true, you must read from // them to prevent deadlock. Return struct { @@ -147,6 +152,7 @@ func NewConfig() *Config { c.Consumer.Fetch.Default = 32768 c.Consumer.Retry.Backoff = 2 * time.Second c.Consumer.MaxWaitTime = 250 * time.Millisecond + c.Consumer.MaxProcessingTime = 100 * time.Millisecond c.Consumer.Return.Errors = false c.ChannelBufferSize = 256 @@ -239,7 +245,9 @@ func (c *Config) Validate() error { case c.Consumer.Fetch.Max < 0: return ConfigurationError("Invalid Consumer.Fetch.Max, must be >= 0") case c.Consumer.MaxWaitTime < 1*time.Millisecond: - return ConfigurationError("Invalid Consumer.MaxWaitTime, must be > 1ms") + return ConfigurationError("Invalid Consumer.MaxWaitTime, must be >= 1ms") + case c.Consumer.MaxProcessingTime <= 0: + return ConfigurationError("Invalid Consumer.MaxProcessingTime, must be > 0") case c.Consumer.Retry.Backoff < 0: return ConfigurationError("Invalid Consumer.Retry.Backoff, must be >= 0") } diff --git a/consumer.go b/consumer.go index 7fcc49ac5..26b6b9439 100644 --- a/consumer.go +++ b/consumer.go @@ -401,11 +401,21 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { } func (child *partitionConsumer) responseFeeder() { +feederLoop: for response := range child.feeder { switch msgs, err := child.parseResponse(response); err { case nil: - for _, msg := range msgs { - child.messages <- msg + for i, msg := range msgs { + select { + case child.messages <- msg: + case <-time.After(child.conf.Consumer.MaxProcessingTime): + child.broker.done <- child + for _, msg = range msgs[i:] { + child.messages <- msg + } + child.broker.input <- child + continue feederLoop + } } case ErrOffsetOutOfRange: // there's no point in retrying this it will just fail the same way again @@ -422,7 +432,7 @@ func (child *partitionConsumer) responseFeeder() { child.dispatchReason = err } - child.broker.acks.Done() + child.broker.done <- nil } close(child.messages) @@ -504,7 +514,7 @@ type brokerConsumer struct { newSubscriptions chan []*partitionConsumer wait chan none subscriptions map[*partitionConsumer]none - acks sync.WaitGroup + done chan *partitionConsumer refs int } @@ -516,6 +526,7 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { newSubscriptions: make(chan []*partitionConsumer), wait: make(chan none), subscriptions: make(map[*partitionConsumer]none), + done: make(chan *partitionConsumer), refs: 0, } @@ -587,11 +598,17 @@ func (bc *brokerConsumer) subscriptionConsumer() { return } - bc.acks.Add(len(bc.subscriptions)) + expected := len(bc.subscriptions) for child := range bc.subscriptions { child.feeder <- response } - bc.acks.Wait() + for i := 0; i < expected; i++ { + if child := <-bc.done; child != nil { + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", + bc.broker.ID(), child.topic, child.partition) + delete(bc.subscriptions, child) + } + } } }