Skip to content

Commit

Permalink
Merge pull request #2108 from Shopify/dnwe/fix-consumer-from-follower
Browse files Browse the repository at this point in the history
fix: clear preferredReadReplica if broker shutdown
  • Loading branch information
dnwe authored Jan 12, 2022
2 parents 9d82f86 + 1f754e2 commit a059adb
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 2 deletions.
14 changes: 12 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ func (child *partitionConsumer) dispatcher() {
child.broker = nil
}

Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
if err := child.dispatch(); err != nil {
child.sendError(err)
child.trigger <- none{}
Expand All @@ -372,6 +371,14 @@ func (child *partitionConsumer) preferredBroker() (*Broker, error) {
if err == nil {
return broker, nil
}
Logger.Printf(
"consumer/%s/%d failed to find active broker for preferred read replica %d - will fallback to leader",
child.topic, child.partition, child.preferredReadReplica)

// if we couldn't find it, discard the replica preference and trigger a
// metadata refresh whilst falling back to consuming from the leader again
child.preferredReadReplica = invalidPreferredReplicaID
_ = child.consumer.client.RefreshMetadata(child.topic)
}

// if preferred replica cannot be found fallback to leader
Expand Down Expand Up @@ -856,6 +863,9 @@ func (bc *brokerConsumer) handleResponses() {
if preferredBroker, err := child.preferredBroker(); err == nil {
if bc.broker.ID() != preferredBroker.ID() {
// not an error but needs redispatching to consume from preferred replica
Logger.Printf(
"consumer/broker/%d abandoned in favor of preferred replica broker/%d\n",
bc.broker.ID(), preferredBroker.ID())
child.trigger <- none{}
delete(bc.subscriptions, child)
}
Expand All @@ -864,7 +874,7 @@ func (bc *brokerConsumer) handleResponses() {
}

// Discard any replica preference.
child.preferredReadReplica = -1
child.preferredReadReplica = invalidPreferredReplicaID

switch result {
case errTimedOut:
Expand Down
127 changes: 127 additions & 0 deletions functional_consumer_follower_fetch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//go:build functional
// +build functional

package sarama

import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"
)

func TestConsumerFetchFollowerFailover(t *testing.T) {
const (
topic = "test.1"
numMsg = 1000
)

newConfig := func() *Config {
config := NewConfig()
config.ClientID = t.Name()
config.Version = V2_8_0_0
config.Producer.Return.Successes = true
return config
}

config := newConfig()

// pick a partition and find the ID for one of the follower brokers
admin, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer admin.Close()

metadata, err := admin.DescribeTopics([]string{topic})
if err != nil {
t.Fatal(err)
}
partition := metadata[0].Partitions[0]
leader := metadata[0].Partitions[0].Leader
follower := int32(-1)
for _, replica := range partition.Replicas {
if replica == leader {
continue
}
follower = replica
break
}

t.Logf("topic %s has leader kafka-%d and our chosen follower is kafka-%d", topic, leader, follower)

// match our clientID to the given broker so our requests should end up fetching from that follower
config.RackID = strconv.FormatInt(int64(follower), 10)

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}

pc, err := consumer.ConsumePartition(topic, partition.ID, OffsetOldest)
if err != nil {
t.Fatal(err)
}
defer func() {
pc.Close()
consumer.Close()
}()

producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer producer.Close()

var wg sync.WaitGroup
wg.Add(numMsg)

go func() {
for i := 0; i < numMsg; i++ {
msg := &ProducerMessage{
Topic: topic, Key: nil, Value: StringEncoder(fmt.Sprintf("%s %-3d", t.Name(), i))}
if _, offset, err := producer.SendMessage(msg); err != nil {
t.Error(i, err)
} else if offset%50 == 0 {
t.Logf("sent: %d\n", offset)
}
wg.Done()
time.Sleep(time.Millisecond * 25)
}
}()

i := 0

for ; i < numMsg/8; i++ {
msg := <-pc.Messages()
if msg.Offset%50 == 0 {
t.Logf("recv: %d\n", msg.Offset)
}
}

if err := stopDockerTestBroker(context.Background(), follower); err != nil {
t.Fatal(err)
}

for ; i < numMsg/3; i++ {
msg := <-pc.Messages()
if msg.Offset%50 == 0 {
t.Logf("recv: %d\n", msg.Offset)
}
}

if err := startDockerTestBroker(context.Background(), follower); err != nil {
t.Fatal(err)
}

for ; i < numMsg; i++ {
msg := <-pc.Messages()
if msg.Offset%50 == 0 {
t.Logf("recv: %d\n", msg.Offset)
}
}

wg.Wait()
}
23 changes: 23 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error
toxiproxyHost := toxiproxyURL.Hostname()

env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
env.Proxies = map[string]*toxiproxy.Proxy{}
for i := 1; i <= 5; i++ {
proxyName := fmt.Sprintf("kafka%d", i)
proxy, err := env.ToxiproxyClient.Proxy(proxyName)
Expand Down Expand Up @@ -262,6 +263,28 @@ func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) er
return nil
}

func startDockerTestBroker(ctx context.Context, brokerID int32) error {
service := fmt.Sprintf("kafka-%d", brokerID)
c := exec.Command("docker-compose", "start", service)
c.Stdout = os.Stdout
c.Stderr = os.Stderr
if err := c.Run(); err != nil {
return fmt.Errorf("failed to run docker-compose to start test broker kafka-%d: %w", brokerID, err)
}
return nil
}

func stopDockerTestBroker(ctx context.Context, brokerID int32) error {
service := fmt.Sprintf("kafka-%d", brokerID)
c := exec.Command("docker-compose", "stop", service)
c.Stdout = os.Stdout
c.Stderr = os.Stderr
if err := c.Run(); err != nil {
return fmt.Errorf("failed to run docker-compose to stop test broker kafka-%d: %w", brokerID, err)
}
return nil
}

func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
Logger.Println("creating test topics")
var testTopicNames []string
Expand Down

0 comments on commit a059adb

Please sign in to comment.