diff --git a/consumer.go b/consumer.go index df13c2996..b00b783ef 100644 --- a/consumer.go +++ b/consumer.go @@ -859,7 +859,7 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { bc := &brokerConsumer{ consumer: c, broker: broker, - input: make(chan *partitionConsumer), + input: make(chan *partitionConsumer, 4096), newSubscriptions: make(chan []*partitionConsumer), wait: make(chan none), subscriptions: make(map[*partitionConsumer]none), @@ -878,36 +878,54 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available, // so the main goroutine can block waiting for work if it has none. func (bc *brokerConsumer) subscriptionManager() { - var buffer []*partitionConsumer + var partitionConsumers []*partitionConsumer for { - if len(buffer) > 0 { - select { - case event, ok := <-bc.input: - if !ok { - goto done - } - buffer = append(buffer, event) - case bc.newSubscriptions <- buffer: - buffer = nil - case bc.wait <- none{}: + // check for any partition consumer asking to subscribe if there aren't + // any, trigger the network request by sending "nil" to the + // newSubscriptions channel + select { + case pc, ok := <-bc.input: + if !ok { + goto done } - } else { - select { - case event, ok := <-bc.input: - if !ok { - goto done + + // add to list of subscribing consumers + partitionConsumers = append(partitionConsumers, pc) + + // wait up to 250ms to drain input of any further incoming + // subscriptions + for batchComplete := false; !batchComplete; { + select { + case pc, ok := <-bc.input: + if !ok { + goto done + } + + partitionConsumers = append(partitionConsumers, pc) + case <-time.After(250 * time.Millisecond): + batchComplete = true } - buffer = append(buffer, event) - case bc.newSubscriptions <- nil: } + + Logger.Printf( + "consumer/broker/%d accumulated %d new subscriptions\n", + bc.broker.ID(), len(partitionConsumers)) + + bc.wait <- none{} + bc.newSubscriptions <- partitionConsumers + + // clear out the batch + partitionConsumers = nil + + case bc.newSubscriptions <- nil: } } done: close(bc.wait) - if len(buffer) > 0 { - bc.newSubscriptions <- buffer + if len(partitionConsumers) > 0 { + bc.newSubscriptions <- partitionConsumers } close(bc.newSubscriptions) }