Skip to content

Commit

Permalink
fix: avoid starvation in subscriptionManager
Browse files Browse the repository at this point in the history
The first few fetches from Kafka may only fetch data from one or two
partitions, starving the rest for a very long time (depending on message
size / processing time)

Once a member joins the consumer groups and receives its partitions,
they are fed into the "subscription manager" from different go routines.
The subscription manager then performs batching and executes a fetch for
all the partitions. However, it seems like the batching logic in
`subscriptionManager` is faulty, perhaps assuming that `case:` order
prioritizes which `case` should be handled when all are signaled which
is not the case, according to the go docs
(https://golang.org/ref/spec#Select_statements):
```
If one or more of the communications can proceed, a single one that can
proceed is chosen via a uniform pseudo-random selection. Otherwise, if
there is a default case, that case is chosen. If there is no default
case, the "select" statement blocks until at least one of the
communications can proceed.
```

For example - if you receive 64 partitions, each will be handled in
their own go routine which sends the partition information to the
`bc.input` channel. After an iteration there is a race between `case
event, ok := <-bc.input` which will batch the request and `case
bc.newSubscriptions <- buffer` which will trigger an immediate fetch of
the 1 or 2 partitions that made it into the batch.

This issue only really affects slow consumers with short messages. If
the condition happens with 1 partition being in the batch (even though
63 extra partitions have been claimed but didn't make it into the batch)
the fetch will ask for 1MB (by default) of messages from that single
partition. If the messages are only a few bytes long and processing time
is minutes, you will not perform another fetch for hours.

Contributes-to: #1608 #1897
  • Loading branch information
pavius authored and dnwe committed Jan 12, 2022
1 parent a060eca commit 5770556
Showing 1 changed file with 40 additions and 21 deletions.
61 changes: 40 additions & 21 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
bc := &brokerConsumer{
consumer: c,
broker: broker,
input: make(chan *partitionConsumer),
input: make(chan *partitionConsumer, 4096),
newSubscriptions: make(chan []*partitionConsumer),
wait: make(chan none),
subscriptions: make(map[*partitionConsumer]none),
Expand All @@ -771,36 +771,55 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
// so the main goroutine can block waiting for work if it has none.
func (bc *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer
var partitionConsumers []*partitionConsumer

for {
if len(buffer) > 0 {
select {
case event, ok := <-bc.input:
if !ok {
goto done
}
buffer = append(buffer, event)
case bc.newSubscriptions <- buffer:
buffer = nil
case bc.wait <- none{}:

// check for any partition consumer asking to subscribe if there aren't
// any, trigger the network request by sending "nil" to the
// newSubscriptions channel
select {
case pc, ok := <-bc.input:
if !ok {
goto done
}
} else {
select {
case event, ok := <-bc.input:
if !ok {
goto done

// add to list of subscribing consumers
partitionConsumers = append(partitionConsumers, pc)

// wait up to 250ms to drain input of any further incoming
// subscriptions
for batchComplete := false; !batchComplete; {
select {
case pc, ok := <-bc.input:
if !ok {
goto done
}

partitionConsumers = append(partitionConsumers, pc)
case <-time.After(250 * time.Millisecond):
batchComplete = true
}
buffer = append(buffer, event)
case bc.newSubscriptions <- nil:
}

Logger.Printf(
"consumer/broker/%d accumulated %d new subscriptions\n",
bc.broker.ID(), len(partitionConsumers))

bc.wait <- none{}
bc.newSubscriptions <- partitionConsumers

// clear out the batch
partitionConsumers = nil

case bc.newSubscriptions <- nil:
}
}

done:
close(bc.wait)
if len(buffer) > 0 {
bc.newSubscriptions <- buffer
if len(partitionConsumers) > 0 {
bc.newSubscriptions <- partitionConsumers
}
close(bc.newSubscriptions)
}
Expand Down

0 comments on commit 5770556

Please sign in to comment.