Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.30.1 patched #1

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ func (child *partitionConsumer) dispatcher() {
child.broker = nil
}

Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
if err := child.dispatch(); err != nil {
child.sendError(err)
child.trigger <- none{}
Expand All @@ -372,6 +371,14 @@ func (child *partitionConsumer) preferredBroker() (*Broker, error) {
if err == nil {
return broker, nil
}
Logger.Printf(
"consumer/%s/%d failed to find active broker for preferred read replica %d - will fallback to leader",
child.topic, child.partition, child.preferredReadReplica)

// if we couldn't find it, discard the replica preference and trigger a
// metadata refresh whilst falling back to consuming from the leader again
child.preferredReadReplica = invalidPreferredReplicaID
_ = child.consumer.client.RefreshMetadata(child.topic)
}

// if preferred replica cannot be found fallback to leader
Expand Down Expand Up @@ -853,6 +860,9 @@ func (bc *brokerConsumer) handleResponses() {
if preferredBroker, err := child.preferredBroker(); err == nil {
if bc.broker.ID() != preferredBroker.ID() {
// not an error but needs redispatching to consume from preferred replica
Logger.Printf(
"consumer/broker/%d abandoned in favor of preferred replica broker/%d\n",
bc.broker.ID(), preferredBroker.ID())
child.trigger <- none{}
delete(bc.subscriptions, child)
}
Expand All @@ -861,7 +871,7 @@ func (bc *brokerConsumer) handleResponses() {
}

// Discard any replica preference.
child.preferredReadReplica = -1
child.preferredReadReplica = invalidPreferredReplicaID

switch result {
case errTimedOut:
Expand Down
47 changes: 44 additions & 3 deletions consumer_group_members.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package sarama

// ConsumerGroupMemberMetadata holds the metadata for consumer group
// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
type ConsumerGroupMemberMetadata struct {
Version int16
Topics []string
UserData []byte
Version int16
Topics []string
UserData []byte
OwnedPartitions []*OwnedPartition
}

func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
Expand Down Expand Up @@ -33,11 +35,50 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
if m.UserData, err = pd.getBytes(); err != nil {
return
}
if m.Version >= 1 {
n, err := pd.getArrayLength()
if err != nil {
// permit missing data here in case of misbehaving 3rd party
// clients who incorrectly marked the member metadata as V1 in
// their JoinGroup request
if err == ErrInsufficientData {
return nil
}
return err
}
if n == 0 {
return nil
}
m.OwnedPartitions = make([]*OwnedPartition, n)
for i := 0; i < n; i++ {
m.OwnedPartitions[i] = &OwnedPartition{}
if err := m.OwnedPartitions[i].decode(pd); err != nil {
return err
}
}
}

return nil
}

type OwnedPartition struct {
Topic string
Partitions []int32
}

func (m *OwnedPartition) decode(pd packetDecoder) (err error) {
if m.Topic, err = pd.getString(); err != nil {
return err
}
if m.Partitions, err = pd.getInt32Array(); err != nil {
return err
}

return nil
}

// ConsumerGroupMemberAssignment holds the member assignment for a consume group
// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
type ConsumerGroupMemberAssignment struct {
Version int16
Topics map[string][]int32
Expand Down
50 changes: 40 additions & 10 deletions consumer_group_members_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,55 @@ import (
)

var (
groupMemberMetadata = []byte{
0, 1, // Version
groupMemberMetadataV0 = []byte{
0, 0, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
groupMemberAssignment = []byte{
0, 1, // Version
groupMemberAssignmentV0 = []byte{
0, 0, // Version
0, 0, 0, 1, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, // Topic one, partition array length
0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}

// notably it looks like the old 3rdparty bsm/sarama-cluster incorrectly
// set V1 in the member metadata when it sent the JoinGroup request so
// we need to cope with that one being too short
groupMemberMetadataV1Bad = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}

groupMemberMetadataV1 = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
0, 0, 0, 0, // OwnedPartitions KIP-429
}
)

func TestConsumerGroupMemberMetadata(t *testing.T) {
meta := &ConsumerGroupMemberMetadata{
Version: 1,
Version: 0,
Topics: []string{"one", "two"},
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(meta, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberMetadata, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf)
} else if !bytes.Equal(groupMemberMetadataV0, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadataV0, buf)
}

meta2 := new(ConsumerGroupMemberMetadata)
Expand All @@ -47,9 +67,19 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {
}
}

func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(groupMemberMetadataV1, meta); err != nil {
t.Error("Failed to decode V1 data", err)
}
if err := decode(groupMemberMetadataV1Bad, meta); err != nil {
t.Error("Failed to decode V1 'bad' data", err)
}
}

func TestConsumerGroupMemberAssignment(t *testing.T) {
amt := &ConsumerGroupMemberAssignment{
Version: 1,
Version: 0,
Topics: map[string][]int32{
"one": {0, 2, 4},
},
Expand All @@ -59,8 +89,8 @@ func TestConsumerGroupMemberAssignment(t *testing.T) {
buf, err := encode(amt, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberAssignment, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf)
} else if !bytes.Equal(groupMemberAssignmentV0, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignmentV0, buf)
}

amt2 := new(ConsumerGroupMemberAssignment)
Expand Down
6 changes: 6 additions & 0 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,18 @@ func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) {
}

func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
if len(gmd.MemberAssignment) == 0 {
return nil, nil
}
assignment := new(ConsumerGroupMemberAssignment)
err := decode(gmd.MemberAssignment, assignment)
return assignment, err
}

func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) {
if len(gmd.MemberMetadata) == 0 {
return nil, nil
}
metadata := new(ConsumerGroupMemberMetadata)
err := decode(gmd.MemberMetadata, metadata)
return metadata, err
Expand Down
127 changes: 127 additions & 0 deletions functional_consumer_follower_fetch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//go:build functional
// +build functional

package sarama

import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"
)

func TestConsumerFetchFollowerFailover(t *testing.T) {
const (
topic = "test.1"
numMsg = 1000
)

newConfig := func() *Config {
config := NewConfig()
config.ClientID = t.Name()
config.Version = V2_8_0_0
config.Producer.Return.Successes = true
return config
}

config := newConfig()

// pick a partition and find the ID for one of the follower brokers
admin, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer admin.Close()

metadata, err := admin.DescribeTopics([]string{topic})
if err != nil {
t.Fatal(err)
}
partition := metadata[0].Partitions[0]
leader := metadata[0].Partitions[0].Leader
follower := int32(-1)
for _, replica := range partition.Replicas {
if replica == leader {
continue
}
follower = replica
break
}

t.Logf("topic %s has leader kafka-%d and our chosen follower is kafka-%d", topic, leader, follower)

// match our clientID to the given broker so our requests should end up fetching from that follower
config.RackID = strconv.FormatInt(int64(follower), 10)

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}

pc, err := consumer.ConsumePartition(topic, partition.ID, OffsetOldest)
if err != nil {
t.Fatal(err)
}
defer func() {
pc.Close()
consumer.Close()
}()

producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer producer.Close()

var wg sync.WaitGroup
wg.Add(numMsg)

go func() {
for i := 0; i < numMsg; i++ {
msg := &ProducerMessage{
Topic: topic, Key: nil, Value: StringEncoder(fmt.Sprintf("%s %-3d", t.Name(), i))}
if _, offset, err := producer.SendMessage(msg); err != nil {
t.Error(i, err)
} else if offset%50 == 0 {
t.Logf("sent: %d\n", offset)
}
wg.Done()
time.Sleep(time.Millisecond * 25)
}
}()

i := 0

for ; i < numMsg/8; i++ {
msg := <-pc.Messages()
if msg.Offset%50 == 0 {
t.Logf("recv: %d\n", msg.Offset)
}
}

if err := stopDockerTestBroker(context.Background(), follower); err != nil {
t.Fatal(err)
}

for ; i < numMsg/3; i++ {
msg := <-pc.Messages()
if msg.Offset%50 == 0 {
t.Logf("recv: %d\n", msg.Offset)
}
}

if err := startDockerTestBroker(context.Background(), follower); err != nil {
t.Fatal(err)
}

for ; i < numMsg; i++ {
msg := <-pc.Messages()
if msg.Offset%50 == 0 {
t.Logf("recv: %d\n", msg.Offset)
}
}

wg.Wait()
}
23 changes: 23 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error
toxiproxyHost := toxiproxyURL.Hostname()

env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
env.Proxies = map[string]*toxiproxy.Proxy{}
for i := 1; i <= 5; i++ {
proxyName := fmt.Sprintf("kafka%d", i)
proxy, err := env.ToxiproxyClient.Proxy(proxyName)
Expand Down Expand Up @@ -262,6 +263,28 @@ func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) er
return nil
}

func startDockerTestBroker(ctx context.Context, brokerID int32) error {
service := fmt.Sprintf("kafka-%d", brokerID)
c := exec.Command("docker-compose", "start", service)
c.Stdout = os.Stdout
c.Stderr = os.Stderr
if err := c.Run(); err != nil {
return fmt.Errorf("failed to run docker-compose to start test broker kafka-%d: %w", brokerID, err)
}
return nil
}

func stopDockerTestBroker(ctx context.Context, brokerID int32) error {
service := fmt.Sprintf("kafka-%d", brokerID)
c := exec.Command("docker-compose", "stop", service)
c.Stdout = os.Stdout
c.Stderr = os.Stderr
if err := c.Run(); err != nil {
return fmt.Errorf("failed to run docker-compose to stop test broker kafka-%d: %w", brokerID, err)
}
return nil
}

func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
Logger.Println("creating test topics")
var testTopicNames []string
Expand Down