From 98e05846e8ad7e25b17d4dfaf3c405810c7b8050 Mon Sep 17 00:00:00 2001 From: "Deepak.Neralla" Date: Mon, 11 May 2020 01:50:49 +0000 Subject: [PATCH 1/2] KIP-392: Consumer support for sarama --- client.go | 14 ++++++++++ client_test.go | 36 ++++++++++++++++++++++++++ consumer.go | 44 +++++++++++++++++++++++-------- consumer_test.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++ errors.go | 3 +++ 5 files changed, 153 insertions(+), 11 deletions(-) diff --git a/client.go b/client.go index c3392f961..3c187c0af 100644 --- a/client.go +++ b/client.go @@ -29,6 +29,9 @@ type Client interface { // Brokers returns the current set of active brokers as retrieved from cluster metadata. Brokers() []*Broker + // Broker returns the active Broker if available for the brokerID + Broker(brokerID int32) (*Broker, error) + // Topics returns the set of available topics as retrieved from cluster metadata. Topics() ([]string, error) @@ -196,6 +199,17 @@ func (client *client) Brokers() []*Broker { return brokers } +func (client *client) Broker(brokerID int32) (*Broker, error) { + client.lock.RLock() + defer client.lock.RUnlock() + broker, ok := client.brokers[brokerID] + if !ok { + return nil, ErrBrokerNotFound + } + _ = broker.Open(client.conf) + return broker, nil +} + func (client *client) InitProducerID() (*InitProducerIDResponse, error) { var err error for broker := client.any(); broker != nil; broker = client.any() { diff --git a/client_test.go b/client_test.go index 9eee454da..83e788a2a 100644 --- a/client_test.go +++ b/client_test.go @@ -545,6 +545,42 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { } } +func TestClientGetBroker(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 5) + + metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse1) + + client, err := NewClient([]string{seedBroker.Addr()}, nil) + if err != nil { + t.Fatal(err) + } + + broker, err := client.Broker(leader.BrokerID()) + if err != nil { + t.Fatal(err) + } + + if broker.Addr() != leader.Addr() { + t.Errorf("Expected broker to have address %s, found %s", leader.Addr(), broker.Addr()) + } + + metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse2) + + if err := client.RefreshMetadata(); err != nil { + t.Error(err) + } + broker, err = client.Broker(leader.BrokerID()) + if err != ErrBrokerNotFound { + t.Errorf("Expected Broker(brokerID) to return %v found %v", ErrBrokerNotFound, err) + } +} + func TestClientResurrectDeadSeeds(t *testing.T) { initialSeed := NewMockBroker(t, 0) emptyMetadata := new(MetadataResponse) diff --git a/consumer.go b/consumer.go index e16d08aa9..dfd58e77b 100644 --- a/consumer.go +++ b/consumer.go @@ -299,6 +299,9 @@ type partitionConsumer struct { errors chan *ConsumerError feeder chan *FetchResponse + replicaInited bool + preferredReadReplica int32 + trigger, dying chan none closeOnce sync.Once topic string @@ -359,21 +362,29 @@ func (child *partitionConsumer) dispatcher() { close(child.feeder) } +func (child *partitionConsumer) preferedBroker() (*Broker, error) { + if child.replicaInited { + broker, err := child.consumer.client.Broker(child.preferredReadReplica) + if err == nil { + return broker, nil + } + } + + // if prefered replica cannot be found fallback to leader + return child.consumer.client.Leader(child.topic, child.partition) +} + func (child *partitionConsumer) dispatch() error { if err := child.consumer.client.RefreshMetadata(child.topic); err != nil { return err } - - var leader *Broker + var node *Broker var err error - if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil { + if node, err = child.preferedBroker(); err != nil { return err } - - child.broker = child.consumer.refBrokerConsumer(leader) - + child.broker = child.consumer.refBrokerConsumer(node) child.broker.input <- child - return nil } @@ -445,7 +456,6 @@ func (child *partitionConsumer) responseFeeder() { feederLoop: for response := range child.feeder { msgs, child.responseResult = child.parseResponse(response) - if child.responseResult == nil { atomic.StoreInt32(&child.retries, 0) } @@ -480,7 +490,6 @@ feederLoop: } } } - child.broker.acks.Done() } @@ -617,6 +626,12 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu child.fetchSize = child.conf.Consumer.Fetch.Default atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) + if response.Version == 11 && len(child.consumer.conf.RackID) > 0 { + // we got a valid response with messages. update child's preferredReadReplica from the FetchResponseBlock + child.replicaInited = true + child.preferredReadReplica = block.PreferredReadReplica + } + // abortedProducerIDs contains producerID which message should be ignored as uncommitted // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset) // - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over @@ -815,10 +830,16 @@ func (bc *brokerConsumer) handleResponses() { for child := range bc.subscriptions { result := child.responseResult child.responseResult = nil - switch result { case nil: - // no-op + if !child.replicaInited { + return + } + if bc.broker.ID() != child.preferredReadReplica { + // not an error but needs redispatching to consume from prefered replica + child.trigger <- none{} + delete(bc.subscriptions, child) + } case errTimedOut: Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", bc.broker.ID(), child.topic, child.partition) @@ -834,6 +855,7 @@ func (bc *brokerConsumer) handleResponses() { // not an error, but does need redispatching Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", bc.broker.ID(), child.topic, child.partition, result) + child.replicaInited = false child.trigger <- none{} delete(bc.subscriptions, child) default: diff --git a/consumer_test.go b/consumer_test.go index d0617f2ab..9a75cf822 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -618,6 +618,73 @@ func TestConsumeMessageWithSessionIDs(t *testing.T) { } } +func TestConsumeMessagesFromReadReplica(t *testing.T) { + // Given + fetchResponse1 := &FetchResponse{Version: 11} + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) + block1 := fetchResponse1.GetBlock("my_topic", 0) + block1.PreferredReadReplica = 1 + + fetchResponse2 := &FetchResponse{Version: 11} + fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 3) + fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 4) + block2 := fetchResponse2.GetBlock("my_topic", 0) + block2.PreferredReadReplica = 1 + + cfg := NewConfig() + cfg.Version = V2_3_0_0 + cfg.RackID = "consumer_rack" + + leader := NewMockBroker(t, 0) + broker0 := NewMockBroker(t, 1) + + leader.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetBroker(leader.Addr(), leader.BrokerID()). + SetLeader("my_topic", 0, leader.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(1). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fetchResponse1), + }) + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetBroker(leader.Addr(), leader.BrokerID()). + SetLeader("my_topic", 0, leader.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(1). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fetchResponse2), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + + // When + consumer, err := master.ConsumePartition("my_topic", 0, 1) + if err != nil { + t.Fatal(err) + } + + assertMessageOffset(t, <-consumer.Messages(), 1) + assertMessageOffset(t, <-consumer.Messages(), 2) + assertMessageOffset(t, <-consumer.Messages(), 3) + assertMessageOffset(t, <-consumer.Messages(), 4) + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() + leader.Close() +} + // It is fine if offsets of fetched messages are not sequential (although // strictly increasing!). func TestConsumerNonSequentialOffsets(t *testing.T) { diff --git a/errors.go b/errors.go index ca621b092..0427451d5 100644 --- a/errors.go +++ b/errors.go @@ -9,6 +9,9 @@ import ( // or otherwise failed to respond. var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)") +// ErrBrokerNotFound is returned when there's no broker found for the requested id. +var ErrBrokerNotFound = errors.New("kafka: broker for id is not found") + // ErrClosedClient is the error returned when a method is called on a client that has been closed. var ErrClosedClient = errors.New("kafka: tried to use a client that was closed") From f5275c211ce9b761a5e90d04c60762669938b02c Mon Sep 17 00:00:00 2001 From: Dan Peterson Date: Sat, 3 Oct 2020 18:50:11 -0300 Subject: [PATCH 2/2] partitionConsumer: continue preferred read replica changes Address comments on https://github.com/Shopify/sarama/pull/1696. Change to only adding and using a new preferredReadReplica field. Move setting of the preferredReadReplica field up in parseResponse. This causes the returned preferred read replica to be used even if there are no records in the fetch response. Set FetchResponse.PreferredReadReplica to -1 when decoding versions prior to 11. Add more tests. Change consumeMsgs in functional tests to use subtests to reduce number of outstanding connections. --- client.go | 2 +- client_test.go | 2 +- consumer.go | 43 ++++---- consumer_test.go | 215 +++++++++++++++++++++++++++++++++++- errors.go | 4 +- fetch_response.go | 2 + fetch_response_test.go | 12 ++ functional_consumer_test.go | 67 ++++++----- 8 files changed, 285 insertions(+), 62 deletions(-) diff --git a/client.go b/client.go index 9cc26294b..1f53887cd 100644 --- a/client.go +++ b/client.go @@ -29,7 +29,7 @@ type Client interface { // Brokers returns the current set of active brokers as retrieved from cluster metadata. Brokers() []*Broker - // Broker returns the active Broker if available for the brokerID + // Broker returns the active Broker if available for the broker ID. Broker(brokerID int32) (*Broker, error) // Topics returns the set of available topics as retrieved from cluster metadata. diff --git a/client_test.go b/client_test.go index 4e7c0eb97..512b0949a 100644 --- a/client_test.go +++ b/client_test.go @@ -585,7 +585,7 @@ func TestClientGetBroker(t *testing.T) { metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) - client, err := NewClient([]string{seedBroker.Addr()}, nil) + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { t.Fatal(err) } diff --git a/consumer.go b/consumer.go index 6001ac4ee..9bd8d1820 100644 --- a/consumer.go +++ b/consumer.go @@ -303,7 +303,6 @@ type partitionConsumer struct { errors chan *ConsumerError feeder chan *FetchResponse - replicaInited bool preferredReadReplica int32 trigger, dying chan none @@ -366,8 +365,8 @@ func (child *partitionConsumer) dispatcher() { close(child.feeder) } -func (child *partitionConsumer) preferedBroker() (*Broker, error) { - if child.replicaInited { +func (child *partitionConsumer) preferredBroker() (*Broker, error) { + if child.preferredReadReplica >= 0 { broker, err := child.consumer.client.Broker(child.preferredReadReplica) if err == nil { return broker, nil @@ -382,13 +381,16 @@ func (child *partitionConsumer) dispatch() error { if err := child.consumer.client.RefreshMetadata(child.topic); err != nil { return err } - var node *Broker - var err error - if node, err = child.preferedBroker(); err != nil { + + broker, err := child.preferredBroker() + if err != nil { return err } - child.broker = child.consumer.refBrokerConsumer(node) + + child.broker = child.consumer.refBrokerConsumer(broker) + child.broker.input <- child + return nil } @@ -460,6 +462,7 @@ func (child *partitionConsumer) responseFeeder() { feederLoop: for response := range child.feeder { msgs, child.responseResult = child.parseResponse(response) + if child.responseResult == nil { atomic.StoreInt32(&child.retries, 0) } @@ -497,6 +500,7 @@ feederLoop: } } } + child.broker.acks.Done() } @@ -602,6 +606,8 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu consumerBatchSizeMetric.Update(int64(nRecs)) + child.preferredReadReplica = block.PreferredReadReplica + if nRecs == 0 { partialTrailingMessage, err := block.isPartial() if err != nil { @@ -633,12 +639,6 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu child.fetchSize = child.conf.Consumer.Fetch.Default atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) - if response.Version == 11 && len(child.consumer.conf.RackID) > 0 { - // we got a valid response with messages. update child's preferredReadReplica from the FetchResponseBlock - child.replicaInited = true - child.preferredReadReplica = block.PreferredReadReplica - } - // abortedProducerIDs contains producerID which message should be ignored as uncommitted // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset) // - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over @@ -837,16 +837,20 @@ func (bc *brokerConsumer) handleResponses() { for child := range bc.subscriptions { result := child.responseResult child.responseResult = nil - switch result { - case nil: - if !child.replicaInited { - return - } - if bc.broker.ID() != child.preferredReadReplica { + + if result == nil { + if child.preferredReadReplica >= 0 && bc.broker.ID() != child.preferredReadReplica { // not an error but needs redispatching to consume from prefered replica child.trigger <- none{} delete(bc.subscriptions, child) } + continue + } + + // Discard any replica preference. + child.preferredReadReplica = -1 + + switch result { case errTimedOut: Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", bc.broker.ID(), child.topic, child.partition) @@ -862,7 +866,6 @@ func (bc *brokerConsumer) handleResponses() { // not an error, but does need redispatching Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", bc.broker.ID(), child.topic, child.partition, result) - child.replicaInited = false child.trigger <- none{} delete(bc.subscriptions, child) default: diff --git a/consumer_test.go b/consumer_test.go index 525acf533..4c8aee187 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -625,13 +625,148 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) { fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) block1 := fetchResponse1.GetBlock("my_topic", 0) - block1.PreferredReadReplica = 1 + block1.PreferredReadReplica = -1 + + fetchResponse2 := &FetchResponse{Version: 11} + // Create a block with no records. + block2 := fetchResponse1.getOrCreateBlock("my_topic", 0) + block2.PreferredReadReplica = 1 + + fetchResponse3 := &FetchResponse{Version: 11} + fetchResponse3.AddMessage("my_topic", 0, nil, testMsg, 3) + fetchResponse3.AddMessage("my_topic", 0, nil, testMsg, 4) + block3 := fetchResponse3.GetBlock("my_topic", 0) + block3.PreferredReadReplica = -1 + + fetchResponse4 := &FetchResponse{Version: 11} + fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 5) + fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 6) + block4 := fetchResponse4.GetBlock("my_topic", 0) + block4.PreferredReadReplica = -1 + + cfg := NewConfig() + cfg.Version = V2_3_0_0 + cfg.RackID = "consumer_rack" + + leader := NewMockBroker(t, 0) + broker0 := NewMockBroker(t, 1) + + leader.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetBroker(leader.Addr(), leader.BrokerID()). + SetLeader("my_topic", 0, leader.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(1). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), + }) + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetBroker(leader.Addr(), leader.BrokerID()). + SetLeader("my_topic", 0, leader.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(1). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fetchResponse3, fetchResponse4), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + + // When + consumer, err := master.ConsumePartition("my_topic", 0, 1) + if err != nil { + t.Fatal(err) + } + + assertMessageOffset(t, <-consumer.Messages(), 1) + assertMessageOffset(t, <-consumer.Messages(), 2) + assertMessageOffset(t, <-consumer.Messages(), 3) + assertMessageOffset(t, <-consumer.Messages(), 4) + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() + leader.Close() +} + +func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) { + // Given + fetchResponse1 := &FetchResponse{Version: 11} + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) + block1 := fetchResponse1.GetBlock("my_topic", 0) + block1.PreferredReadReplica = 5 // Does not exist. fetchResponse2 := &FetchResponse{Version: 11} fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 3) fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 4) block2 := fetchResponse2.GetBlock("my_topic", 0) - block2.PreferredReadReplica = 1 + block2.PreferredReadReplica = -1 + + cfg := NewConfig() + cfg.Version = V2_3_0_0 + cfg.RackID = "consumer_rack" + + leader := NewMockBroker(t, 0) + + leader.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(leader.Addr(), leader.BrokerID()). + SetLeader("my_topic", 0, leader.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(1). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), + }) + + master, err := NewConsumer([]string{leader.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + + // When + consumer, err := master.ConsumePartition("my_topic", 0, 1) + if err != nil { + t.Fatal(err) + } + + assertMessageOffset(t, <-consumer.Messages(), 1) + assertMessageOffset(t, <-consumer.Messages(), 2) + assertMessageOffset(t, <-consumer.Messages(), 3) + assertMessageOffset(t, <-consumer.Messages(), 4) + + safeClose(t, consumer) + safeClose(t, master) + leader.Close() +} + +func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) { + // Given + fetchResponse1 := &FetchResponse{Version: 11} + block1 := fetchResponse1.getOrCreateBlock("my_topic", 0) + block1.PreferredReadReplica = 1 + + fetchResponse2 := &FetchResponse{Version: 11} + fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 1) + fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 2) + block2 := fetchResponse2.GetBlock("my_topic", 0) + block2.PreferredReadReplica = -1 + + fetchResponse3 := &FetchResponse{Version: 11} + fetchResponse3.AddError("my_topic", 0, ErrReplicaNotAvailable) + + fetchResponse4 := &FetchResponse{Version: 11} + fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3) + fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4) cfg := NewConfig() cfg.Version = V2_3_0_0 @@ -649,7 +784,79 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) { SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), - "FetchRequest": NewMockSequence(fetchResponse1), + "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse4), + }) + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetBroker(leader.Addr(), leader.BrokerID()). + SetLeader("my_topic", 0, leader.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(1). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fetchResponse2, fetchResponse3), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + + // When + consumer, err := master.ConsumePartition("my_topic", 0, 1) + if err != nil { + t.Fatal(err) + } + + assertMessageOffset(t, <-consumer.Messages(), 1) + assertMessageOffset(t, <-consumer.Messages(), 2) + assertMessageOffset(t, <-consumer.Messages(), 3) + assertMessageOffset(t, <-consumer.Messages(), 4) + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() + leader.Close() +} + +func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { + // Given + fetchResponse1 := &FetchResponse{Version: 11} + block1 := fetchResponse1.getOrCreateBlock("my_topic", 0) + block1.PreferredReadReplica = 1 + + fetchResponse2 := &FetchResponse{Version: 11} + fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 1) + fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 2) + block2 := fetchResponse2.GetBlock("my_topic", 0) + block2.PreferredReadReplica = -1 + + fetchResponse3 := &FetchResponse{Version: 11} + fetchResponse3.AddError("my_topic", 0, ErrUnknown) + + fetchResponse4 := &FetchResponse{Version: 11} + fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3) + fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4) + + cfg := NewConfig() + cfg.Version = V2_3_0_0 + cfg.RackID = "consumer_rack" + + leader := NewMockBroker(t, 0) + broker0 := NewMockBroker(t, 1) + + leader.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetBroker(leader.Addr(), leader.BrokerID()). + SetLeader("my_topic", 0, leader.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(1). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse4), }) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -661,7 +868,7 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) { SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), - "FetchRequest": NewMockSequence(fetchResponse2), + "FetchRequest": NewMockSequence(fetchResponse2, fetchResponse3), }) master, err := NewConsumer([]string{broker0.Addr()}, cfg) diff --git a/errors.go b/errors.go index 0427451d5..5781c1c0c 100644 --- a/errors.go +++ b/errors.go @@ -9,8 +9,8 @@ import ( // or otherwise failed to respond. var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)") -// ErrBrokerNotFound is returned when there's no broker found for the requested id. -var ErrBrokerNotFound = errors.New("kafka: broker for id is not found") +// ErrBrokerNotFound is the error returned when there's no broker found for the requested ID. +var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found") // ErrClosedClient is the error returned when a method is called on a client that has been closed. var ErrClosedClient = errors.New("kafka: tried to use a client that was closed") diff --git a/fetch_response.go b/fetch_response.go index ca6d78832..54b88284a 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -89,6 +89,8 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) if err != nil { return err } + } else { + b.PreferredReadReplica = -1 } recordsSize, err := pd.getInt32() diff --git a/fetch_response_test.go b/fetch_response_test.go index e47158160..99ad769c6 100644 --- a/fetch_response_test.go +++ b/fetch_response_test.go @@ -195,6 +195,9 @@ func TestOneMessageFetchResponse(t *testing.T) { if block.HighWaterMarkOffset != 0x10101010 { t.Error("Decoding didn't produce correct high water mark offset.") } + if block.PreferredReadReplica != -1 { + t.Error("Decoding didn't produce correct preferred read replica.") + } partial, err := block.isPartial() if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -308,6 +311,9 @@ func TestOneRecordFetchResponse(t *testing.T) { if block.HighWaterMarkOffset != 0x10101010 { t.Error("Decoding didn't produce correct high water mark offset.") } + if block.PreferredReadReplica != -1 { + t.Error("Decoding didn't produce correct preferred read replica.") + } partial, err := block.isPartial() if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -354,6 +360,9 @@ func TestPartailFetchResponse(t *testing.T) { if block.HighWaterMarkOffset != 0x10101010 { t.Error("Decoding didn't produce correct high water mark offset.") } + if block.PreferredReadReplica != -1 { + t.Error("Decoding didn't produce correct preferred read replica.") + } partial, err := block.isPartial() if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -393,6 +402,9 @@ func TestOneMessageFetchResponseV4(t *testing.T) { if block.HighWaterMarkOffset != 0x10101010 { t.Error("Decoding didn't produce correct high water mark offset.") } + if block.PreferredReadReplica != -1 { + t.Error("Decoding didn't produce correct preferred read replica.") + } partial, err := block.isPartial() if err != nil { t.Fatalf("Unexpected error: %v", err) diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 0ec993087..6364f5b95 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -246,42 +246,41 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) { // Consume all produced messages with all client versions supported by the // cluster. -consumerVersionLoop: for _, consVer := range clientVersions { - t.Logf("*** Consuming with client version %s\n", consVer) - // Create a partition consumer that should start from the first produced - // message. - consCfg := NewTestConfig() - consCfg.Version = consVer - c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg) - if err != nil { - t.Fatal(err) - } - defer safeClose(t, c) - pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset) - if err != nil { - t.Fatal(err) - } - defer safeClose(t, pc) - - // Consume as many messages as there have been produced and make sure that - // order is preserved. - for i, prodMsg := range producedMessages { - select { - case consMsg := <-pc.Messages(): - if consMsg.Offset != prodMsg.Offset { - t.Errorf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s", - consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg)) - continue consumerVersionLoop - } - if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) { - t.Errorf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s", - consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg)) - continue consumerVersionLoop + t.Run(consVer.String(), func(t *testing.T) { + t.Logf("*** Consuming with client version %s\n", consVer) + // Create a partition consumer that should start from the first produced + // message. + consCfg := NewTestConfig() + consCfg.Version = consVer + c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, c) + pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, pc) + + // Consume as many messages as there have been produced and make sure that + // order is preserved. + for i, prodMsg := range producedMessages { + select { + case consMsg := <-pc.Messages(): + if consMsg.Offset != prodMsg.Offset { + t.Fatalf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s", + consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg)) + } + if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) { + t.Fatalf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s", + consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg)) + } + case <-time.After(3 * time.Second): + t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value) } - case <-time.After(3 * time.Second): - t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value) } - } + }) } }