-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: prevent deadlock between subscription manager and consumer goroutines #2194
Conversation
…tionConsumer threads since subscription manager was improved to batch subscriptions (see IBM@dadcd80) it created a deadlock in the case when new subscription are added after a rebalance
I was trying to troubleshoot another bug and wrote this: import (
"context"
"fmt"
"math/rand"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
)
type inlineHandler func(sess ConsumerGroupSession, claim ConsumerGroupClaim) error
func (inlineHandler) Setup(_ ConsumerGroupSession) error { return nil }
func (inlineHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h inlineHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
return h(sess, claim)
}
func TestConsumerGroupRebalance(t *testing.T) {
const topic = "test-topic"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config := NewConfig()
config.Producer.Return.Successes = true
config.Consumer.Offsets.Initial = OffsetOldest
c, err := NewClient([]string{"localhost:9092"}, config)
require.NoError(t, err)
admin, err := NewClusterAdminFromClient(c)
require.NoError(t, err)
err = admin.DeleteTopic(topic)
t.Log(err)
err = admin.CreateTopic(topic, &TopicDetail{NumPartitions: 4, ReplicationFactor: 1}, false)
t.Log(err)
p, err := NewSyncProducerFromClient(c)
require.NoError(t, err)
g, err := NewConsumerGroupFromClient("group", c)
require.NoError(t, err)
timer := time.NewTimer(1 * time.Minute)
defer timer.Stop()
resets := make(chan struct{})
go func() {
for {
select {
case <-ctx.Done():
return
case <-timer.C:
timer.Reset(time.Duration(rand.Float64()*100_000) * time.Microsecond)
resets <- struct{}{}
}
}
}()
ch := make(chan string)
for i := 0; i < 10; i++ {
go func(g ConsumerGroup) {
for {
if ctx.Err() != nil { // context was cancelled
t.Log("consumer-group stopped")
break
}
err := g.Consume(ctx, []string{topic}, inlineHandler(func(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
for {
select {
case <-resets:
return nil
case <-ctx.Done():
return ctx.Err()
case message, ok := <-claim.Messages():
if !ok {
return nil
}
ch <- string(message.Value)
sess.MarkMessage(message, "")
}
}
}))
t.Logf("consumer-group disconnected: %s", err)
}
}(g)
}
time.Sleep(2 * time.Second)
t.Log("booted")
timer.Reset(0)
n := 1000
go func() {
for i := 0; i < n; i++ {
time.Sleep(time.Duration(rand.Float64()*1_000) * time.Microsecond)
m := &ProducerMessage{
Topic: topic,
Value: StringEncoder(strconv.FormatInt(int64(i), 10)),
}
_, _, err := p.SendMessage(m)
require.NoError(t, err)
//p.Input() <- m
}
}()
m := make(map[string]struct{}, n)
for len(m) < n {
line := <-ch
m[line] = struct{}{}
t.Logf("received: %s, count: %d", line, len(m))
}
cancel()
err = g.Close()
require.NoError(t, err)
err = p.Close()
require.NoError(t, err)
err = c.Close()
require.NoError(t, err)
} It introduces chaos, randomly sleeping and exiting claims to trigger rebalances.
When broken, it just gets stuck in a disconnect loop:
I verified that:
However, I tried applying this patch and it didn't seem to fix it, it just gets stuck in the "disconnected" loop |
Thanks @lavoiesl for writing a unit test ❤️ . I will try to reproduce the problem with it and see what is wrong. |
@lavoiesl If I understand correctly Since I'm back to this PR I will try to write a functional test that illustrates original problem I was trying to fix. |
Unfortunately I was not able to create a simple test case that could robustly reproduce the issue with consumer groups. With Kafka launched with the help of docker-compose, if I run
I can reproduce the deadlock:
|
Fair point, here's a similar version, but with different clients: p, err := NewSyncProducerFromClient(c)
require.NoError(t, err)
n := 1000
ch := make(chan string, n)
for i := 0; i < 27; i++ {
go func(i int) {
time.Sleep(time.Duration(i) * time.Second)
for {
if ctx.Err() != nil { // context was cancelled
t.Log("consumer-group stopped")
break
}
config := NewConfig()
config.ClientID = fmt.Sprintf("test-%d", i)
config.Consumer.Offsets.Initial = OffsetOldest
config.Consumer.Group.Heartbeat.Interval = 100 * time.Millisecond
config.Consumer.Group.Rebalance.Timeout = 100 * time.Millisecond
g, err := NewConsumerGroup([]string{"sarama.railgun:9092"}, "group", config)
require.NoError(t, err)
alive := true
for alive {
err = g.Consume(ctx, []string{topic}, inlineHandler(func(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case message, ok := <-claim.Messages():
if !ok {
return nil
}
if rand.Float64() < 0.05 {
alive = false
return errors.New("reset")
}
ch <- fmt.Sprintf("%d-%s", i, message.Value)
sess.MarkMessage(message, "")
}
}
}))
t.Logf("consumer-group disconnected: %s", err)
}
require.NoError(t, g.Close())
}
}(i)
}
time.Sleep(5 * time.Second)
t.Log("booted")
go func() {
for i := 0; i < n; i++ {
m := &ProducerMessage{
Topic: topic,
Value: StringEncoder(strconv.FormatInt(int64(i), 10)),
}
_, _, err := p.SendMessage(m)
t.Logf("send message %d: %s", i, err)
}
}()
m := make(map[string]struct{}, n)
for len(m) < n {
line := <-ch
m[line] = struct{}{}
t.Logf("received: %s, count: %d", line, len(m))
}
cancel()
err = p.Close()
require.NoError(t, err)
err = c.Close()
require.NoError(t, err) It eventually succeeds, but with zillions of disconnects. No, I do not have a real-world example of this happening, I don't have any production apps that use sarama's unreleased versions |
@lavoiesl thanks! I think that extra delay I've introduced (250ms) causes the test to run slower. I think it's fair to lower the delay to, say 100ms. |
@lavoiesl I think I've spotted another regression - the loop creates a new timer each time new subscription is added, thereby the overall "wait" time is increased by 250ms. |
21d1414
to
e0c263a
Compare
Fixing golangci-lint warnings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM — the changes look reasonable to me now, lets go ahead and merge this and continue to perform testing on main prior to cutting the next release
Thanks a lot Dominic |
Since subscription manager was improved to batch the subscriptions (see dadcd80)
it created a deadlock in the case when new subscription are added after a rebalance.
After a rebalance, the subscription is stuck on pushing to the
wait
channel, which is drained only if there are no active subscriptions.