From a9627a09a22a27d0efe0495292264ac0a8e3ea82 Mon Sep 17 00:00:00 2001 From: Dmytro Milinevskyi Date: Wed, 23 Mar 2022 11:41:00 +0100 Subject: [PATCH 1/4] consumer: avoid obscure sync between subscriptionManager and subscriptionConsumer threads since subscription manager was improved to batch subscriptions (see https://github.com/Shopify/sarama/pull/2109/commits/dadcd808a5f6c6394e91f6a614818cf8eab732de) it created a deadlock in the case when new subscription are added after a rebalance --- consumer.go | 72 +++++++++++++++++++++-------------------------------- 1 file changed, 28 insertions(+), 44 deletions(-) diff --git a/consumer.go b/consumer.go index b2039e9c0..2ed817aee 100644 --- a/consumer.go +++ b/consumer.go @@ -850,7 +850,6 @@ type brokerConsumer struct { input chan *partitionConsumer newSubscriptions chan []*partitionConsumer subscriptions map[*partitionConsumer]none - wait chan none acks sync.WaitGroup refs int } @@ -861,7 +860,6 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { broker: broker, input: make(chan *partitionConsumer), newSubscriptions: make(chan []*partitionConsumer), - wait: make(chan none, 1), subscriptions: make(map[*partitionConsumer]none), refs: 0, } @@ -875,72 +873,59 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give -// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available, -// so the main goroutine can block waiting for work if it has none. +// it nil if no new subscriptions are available. func (bc *brokerConsumer) subscriptionManager() { - var partitionConsumers []*partitionConsumer + defer close(bc.newSubscriptions) for { - // check for any partition consumer asking to subscribe if there aren't - // any, trigger the network request by sending "nil" to the + var partitionConsumers []*partitionConsumer + + // Check for any partition consumer asking to subscribe if there aren't + // any, trigger the network request (to fetch Kafka messages) by sending "nil" to the // newSubscriptions channel select { case pc, ok := <-bc.input: if !ok { - goto done + return } - - // add to list of subscribing consumers partitionConsumers = append(partitionConsumers, pc) + case bc.newSubscriptions <- nil: + continue + } - // wait up to 250ms to drain input of any further incoming - // subscriptions - for batchComplete := false; !batchComplete; { - select { - case pc, ok := <-bc.input: - if !ok { - goto done - } - - partitionConsumers = append(partitionConsumers, pc) - case <-time.After(250 * time.Millisecond): - batchComplete = true + // wait up to 250ms to drain input of any further incoming + // subscriptions + for batchComplete := false; !batchComplete; { + select { + case pc, ok := <-bc.input: + if !ok { + return } - } - Logger.Printf( - "consumer/broker/%d accumulated %d new subscriptions\n", - bc.broker.ID(), len(partitionConsumers)) - - bc.wait <- none{} - bc.newSubscriptions <- partitionConsumers - - // clear out the batch - partitionConsumers = nil - - case bc.newSubscriptions <- nil: + partitionConsumers = append(partitionConsumers, pc) + case <-time.After(250 * time.Millisecond): + batchComplete = true + } } - } -done: - close(bc.wait) - if len(partitionConsumers) > 0 { + Logger.Printf( + "consumer/broker/%d accumulated %d new subscriptions\n", + bc.broker.ID(), len(partitionConsumers)) + bc.newSubscriptions <- partitionConsumers } - close(bc.newSubscriptions) } // subscriptionConsumer ensures we will get nil right away if no new subscriptions is available +// this is a the main loop that fetches Kafka messages func (bc *brokerConsumer) subscriptionConsumer() { - <-bc.wait // wait for our first piece of work - for newSubscriptions := range bc.newSubscriptions { bc.updateSubscriptions(newSubscriptions) if len(bc.subscriptions) == 0 { // We're about to be shut down or we're about to receive more subscriptions. - // Either way, the signal just hasn't propagated to our goroutine yet. - <-bc.wait + // Take a small nap to avoid burning the CPU. + time.Sleep(250 * time.Millisecond) continue } @@ -1040,7 +1025,6 @@ func (bc *brokerConsumer) abort(err error) { for newSubscriptions := range bc.newSubscriptions { if len(newSubscriptions) == 0 { - <-bc.wait continue } for _, child := range newSubscriptions { From 76608c09bf93a0c965a09133315d007644ead1b2 Mon Sep 17 00:00:00 2001 From: Dmytro Milinevskyi Date: Tue, 29 Mar 2022 12:08:39 +0200 Subject: [PATCH 2/4] tests:functional: reproduce consumer deadlock --- functional_consumer_test.go | 124 ++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/functional_consumer_test.go b/functional_consumer_test.go index f8de2b4d5..2dd6cf541 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -4,12 +4,14 @@ package sarama import ( + "context" "errors" "fmt" "math" "os" "sort" "strconv" + "strings" "sync" "testing" "time" @@ -282,6 +284,128 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) { } } +func TestConsumerGroupDeadlock(t *testing.T) { + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + require := require.New(t) + + const topic = "test_consumer_group_rebalance_test_topic" + const msgQty = 50 + partitionsQty := len(FunctionalTestEnv.KafkaBrokerAddrs) * 3 + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + + config := NewConfig() + config.ClientID = t.Name() + config.Producer.Return.Successes = true + config.ChannelBufferSize = 2 * msgQty + + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) + require.NoError(err) + + admin, err := NewClusterAdminFromClient(client) + require.NoError(err) + + cgName := "test_consumer_group_rebalance_consumer_group" + + err = admin.DeleteConsumerGroup(cgName) + if err != nil { + t.Logf("failed to delete topic: %s", err) + } + + err = admin.DeleteTopic(topic) + if err != nil { + t.Logf("failed to delete topic: %s", err) + } + + // it takes time to delete topic, the API is not sync + for i := 0; i < 5; i++ { + err = admin.CreateTopic(topic, &TopicDetail{NumPartitions: int32(partitionsQty), ReplicationFactor: 1}, false) + if err == nil { + break + } + if err == ErrTopicAlreadyExists || strings.Contains(err.Error(), "is marked for deletion") { + time.Sleep(500 * time.Millisecond) + continue + } + break + } + require.NoError(err) + defer admin.DeleteTopic(topic) + + var wg sync.WaitGroup + + consumer, err := NewConsumerFromClient(client) + require.NoError(err) + + ch := make(chan string, msgQty) + for i := 0; i < partitionsQty; i++ { + time.Sleep(250 * time.Millisecond) // ensure delays between the "claims" + wg.Add(1) + go func(i int) { + defer wg.Done() + + pConsumer, err := consumer.ConsumePartition(topic, int32(i), OffsetOldest) + require.NoError(err) + defer pConsumer.Close() + + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-pConsumer.Messages(): + if !ok { + return + } + // t.Logf("consumer-group %d consumed: %v from %s/%d/%d", i, msg.Value, msg.Topic, msg.Partition, msg.Offset) + ch <- string(msg.Value) + } + } + }(i) + } + + producer, err := NewSyncProducerFromClient(client) + require.NoError(err) + + for i := 0; i < msgQty; i++ { + msg := &ProducerMessage{ + Topic: topic, + Value: StringEncoder(strconv.FormatInt(int64(i), 10)), + } + _, _, err := producer.SendMessage(msg) + require.NoError(err) + } + + var received []string + func() { + for len(received) < msgQty { + select { + case <-ctx.Done(): + return + case msg := <-ch: + received = append(received, msg) + // t.Logf("received: %s, count: %d", msg, len(received)) + } + } + }() + + cancel() + + require.Equal(msgQty, len(received)) + + err = producer.Close() + require.NoError(err) + + err = consumer.Close() + require.NoError(err) + + err = client.Close() + require.NoError(err) + + wg.Wait() +} + func prodMsg2Str(prodMsg *ProducerMessage) string { return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder))) } From e0c263a1ad2d6ae5af4cd2a886e72d52c0c5a220 Mon Sep 17 00:00:00 2001 From: Dmytro Milinevskyi Date: Thu, 31 Mar 2022 02:12:38 +0200 Subject: [PATCH 3/4] consumer: reduce amount of nap time when waiting for new subscriptions --- consumer.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/consumer.go b/consumer.go index 2ed817aee..46bdadeee 100644 --- a/consumer.go +++ b/consumer.go @@ -96,6 +96,9 @@ type Consumer interface { ResumeAll() } +// max time to wait for more partition subscriptions +const partitionConsumersBatchTimeout = 100 * time.Millisecond + type consumer struct { conf *Config children map[string]map[int32]*partitionConsumer @@ -893,20 +896,17 @@ func (bc *brokerConsumer) subscriptionManager() { continue } - // wait up to 250ms to drain input of any further incoming - // subscriptions + // drain input of any further incoming subscriptions + timer := time.NewTimer(partitionConsumersBatchTimeout) for batchComplete := false; !batchComplete; { select { - case pc, ok := <-bc.input: - if !ok { - return - } - + case pc := <-bc.input: partitionConsumers = append(partitionConsumers, pc) - case <-time.After(250 * time.Millisecond): + case <-timer.C: batchComplete = true } } + timer.Stop() Logger.Printf( "consumer/broker/%d accumulated %d new subscriptions\n", @@ -925,7 +925,7 @@ func (bc *brokerConsumer) subscriptionConsumer() { if len(bc.subscriptions) == 0 { // We're about to be shut down or we're about to receive more subscriptions. // Take a small nap to avoid burning the CPU. - time.Sleep(250 * time.Millisecond) + time.Sleep(partitionConsumersBatchTimeout) continue } @@ -1025,6 +1025,8 @@ func (bc *brokerConsumer) abort(err error) { for newSubscriptions := range bc.newSubscriptions { if len(newSubscriptions) == 0 { + // Take a small nap to avoid burning the CPU. + time.Sleep(partitionConsumersBatchTimeout) continue } for _, child := range newSubscriptions { From 9a9a211cb25afda108adc80bc19d7a8375eebe23 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 13 Apr 2022 22:38:57 +0100 Subject: [PATCH 4/4] fix(test): use errors.Is and ignore DeleteTopic rc Fixing golangci-lint warnings --- functional_consumer_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 2dd6cf541..80ca05b63 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -325,14 +325,16 @@ func TestConsumerGroupDeadlock(t *testing.T) { if err == nil { break } - if err == ErrTopicAlreadyExists || strings.Contains(err.Error(), "is marked for deletion") { + if errors.Is(err, ErrTopicAlreadyExists) || strings.Contains(err.Error(), "is marked for deletion") { time.Sleep(500 * time.Millisecond) continue } break } require.NoError(err) - defer admin.DeleteTopic(topic) + defer func() { + _ = admin.DeleteTopic(topic) + }() var wg sync.WaitGroup