Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid starvation in subscriptionManager #2109

Merged
merged 1 commit into from
Feb 25, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 39 additions & 21 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
broker: broker,
input: make(chan *partitionConsumer),
newSubscriptions: make(chan []*partitionConsumer),
wait: make(chan none),
wait: make(chan none, 1),
subscriptions: make(map[*partitionConsumer]none),
refs: 0,
}
Expand All @@ -878,36 +878,54 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
// so the main goroutine can block waiting for work if it has none.
func (bc *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer
var partitionConsumers []*partitionConsumer

for {
if len(buffer) > 0 {
select {
case event, ok := <-bc.input:
if !ok {
goto done
}
buffer = append(buffer, event)
case bc.newSubscriptions <- buffer:
buffer = nil
case bc.wait <- none{}:
// check for any partition consumer asking to subscribe if there aren't
// any, trigger the network request by sending "nil" to the
// newSubscriptions channel
select {
case pc, ok := <-bc.input:
if !ok {
goto done
}
} else {
select {
case event, ok := <-bc.input:
if !ok {
goto done

// add to list of subscribing consumers
partitionConsumers = append(partitionConsumers, pc)

// wait up to 250ms to drain input of any further incoming
// subscriptions
for batchComplete := false; !batchComplete; {
select {
case pc, ok := <-bc.input:
if !ok {
goto done
}

partitionConsumers = append(partitionConsumers, pc)
case <-time.After(250 * time.Millisecond):
batchComplete = true
}
buffer = append(buffer, event)
case bc.newSubscriptions <- nil:
}

Logger.Printf(
"consumer/broker/%d accumulated %d new subscriptions\n",
bc.broker.ID(), len(partitionConsumers))

bc.wait <- none{}
bc.newSubscriptions <- partitionConsumers

// clear out the batch
partitionConsumers = nil

case bc.newSubscriptions <- nil:
}
}

done:
close(bc.wait)
if len(buffer) > 0 {
bc.newSubscriptions <- buffer
if len(partitionConsumers) > 0 {
bc.newSubscriptions <- partitionConsumers
}
close(bc.newSubscriptions)
}
Expand Down