Skip to content

Commit

Permalink
consumer: don't block on undrained partitions
Browse files Browse the repository at this point in the history
If a partitionConsumer fills up and is not being drained (or is taking a long
time) remove its subscription until it can proceed again in order to not block
other partitions which may still be making progress.
  • Loading branch information
eapache committed Jul 16, 2015
1 parent 26e032c commit 6216a3f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
10 changes: 9 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ type Config struct {
// Equivalent to the JVM's `fetch.wait.max.ms`.
MaxWaitTime time.Duration

// The maximum amount of time the consumer expects a message takes to process for the user. If writing to the Messages channel
// takes longer than this, that partition will stop fetching more messages until it can proceed again. Note that, since the
// Messages channel is buffered, the actual grace time is (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
MaxProcessingTime time.Duration

// Return specifies what channels will be populated. If they are set to true, you must read from
// them to prevent deadlock.
Return struct {
Expand Down Expand Up @@ -147,6 +152,7 @@ func NewConfig() *Config {
c.Consumer.Fetch.Default = 32768
c.Consumer.Retry.Backoff = 2 * time.Second
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false

c.ChannelBufferSize = 256
Expand Down Expand Up @@ -239,7 +245,9 @@ func (c *Config) Validate() error {
case c.Consumer.Fetch.Max < 0:
return ConfigurationError("Invalid Consumer.Fetch.Max, must be >= 0")
case c.Consumer.MaxWaitTime < 1*time.Millisecond:
return ConfigurationError("Invalid Consumer.MaxWaitTime, must be > 1ms")
return ConfigurationError("Invalid Consumer.MaxWaitTime, must be >= 1ms")
case c.Consumer.MaxProcessingTime <= 0:
return ConfigurationError("Invalid Consumer.MaxProcessingTime, must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Invalid Consumer.Retry.Backoff, must be >= 0")
}
Expand Down
29 changes: 23 additions & 6 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,21 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
}

func (child *partitionConsumer) responseFeeder() {
feederLoop:
for response := range child.feeder {
switch msgs, err := child.parseResponse(response); err {
case nil:
for _, msg := range msgs {
child.messages <- msg
for i, msg := range msgs {
select {
case child.messages <- msg:
case <-time.After(child.conf.Consumer.MaxProcessingTime):
child.broker.done <- child
for _, msg = range msgs[i:] {
child.messages <- msg
}
child.broker.input <- child
continue feederLoop
}
}
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
Expand All @@ -422,7 +432,7 @@ func (child *partitionConsumer) responseFeeder() {
child.dispatchReason = err
}

child.broker.acks.Done()
child.broker.done <- nil
}

close(child.messages)
Expand Down Expand Up @@ -504,7 +514,7 @@ type brokerConsumer struct {
newSubscriptions chan []*partitionConsumer
wait chan none
subscriptions map[*partitionConsumer]none
acks sync.WaitGroup
done chan *partitionConsumer
refs int
}

Expand All @@ -516,6 +526,7 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
newSubscriptions: make(chan []*partitionConsumer),
wait: make(chan none),
subscriptions: make(map[*partitionConsumer]none),
done: make(chan *partitionConsumer),
refs: 0,
}

Expand Down Expand Up @@ -587,11 +598,17 @@ func (bc *brokerConsumer) subscriptionConsumer() {
return
}

bc.acks.Add(len(bc.subscriptions))
expected := len(bc.subscriptions)
for child := range bc.subscriptions {
child.feeder <- response
}
bc.acks.Wait()
for i := 0; i < expected; i++ {
if child := <-bc.done; child != nil {
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
delete(bc.subscriptions, child)
}
}
}
}

Expand Down

0 comments on commit 6216a3f

Please sign in to comment.