Skip to content

Commit

Permalink
partitionConsumer: continue preferred read replica changes
Browse files Browse the repository at this point in the history
Address comments on IBM#1696.

Change to only adding and using a new preferredReadReplica field.

Move setting of the preferredReadReplica field up in
parseResponse. This causes the returned preferred read replica to be
used even if there are no records in the fetch response.

Set FetchResponse.PreferredReadReplica to -1 when decoding versions
prior to 11.

Add more tests.
  • Loading branch information
danp committed Oct 4, 2020
1 parent a471995 commit f78211f
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 27 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Client interface {
// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker

// Broker returns the active Broker if available for the brokerID
// Broker returns the active Broker if available for the broker ID.
Broker(brokerID int32) (*Broker, error)

// Topics returns the set of available topics as retrieved from cluster metadata.
Expand Down
43 changes: 23 additions & 20 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ type partitionConsumer struct {
errors chan *ConsumerError
feeder chan *FetchResponse

replicaInited bool
preferredReadReplica int32

trigger, dying chan none
Expand Down Expand Up @@ -366,8 +365,8 @@ func (child *partitionConsumer) dispatcher() {
close(child.feeder)
}

func (child *partitionConsumer) preferedBroker() (*Broker, error) {
if child.replicaInited {
func (child *partitionConsumer) preferredBroker() (*Broker, error) {
if child.preferredReadReplica >= 0 {
broker, err := child.consumer.client.Broker(child.preferredReadReplica)
if err == nil {
return broker, nil
Expand All @@ -382,13 +381,16 @@ func (child *partitionConsumer) dispatch() error {
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
return err
}
var node *Broker
var err error
if node, err = child.preferedBroker(); err != nil {

broker, err := child.preferredBroker()
if err != nil {
return err
}
child.broker = child.consumer.refBrokerConsumer(node)

child.broker = child.consumer.refBrokerConsumer(broker)

child.broker.input <- child

return nil
}

Expand Down Expand Up @@ -460,6 +462,7 @@ func (child *partitionConsumer) responseFeeder() {
feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)

if child.responseResult == nil {
atomic.StoreInt32(&child.retries, 0)
}
Expand Down Expand Up @@ -497,6 +500,7 @@ feederLoop:
}
}
}

child.broker.acks.Done()
}

Expand Down Expand Up @@ -602,6 +606,8 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu

consumerBatchSizeMetric.Update(int64(nRecs))

child.preferredReadReplica = block.PreferredReadReplica

if nRecs == 0 {
partialTrailingMessage, err := block.isPartial()
if err != nil {
Expand Down Expand Up @@ -633,12 +639,6 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

if response.Version == 11 && len(child.consumer.conf.RackID) > 0 {
// we got a valid response with messages. update child's preferredReadReplica from the FetchResponseBlock
child.replicaInited = true
child.preferredReadReplica = block.PreferredReadReplica
}

// abortedProducerIDs contains producerID which message should be ignored as uncommitted
// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
// - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
Expand Down Expand Up @@ -837,16 +837,20 @@ func (bc *brokerConsumer) handleResponses() {
for child := range bc.subscriptions {
result := child.responseResult
child.responseResult = nil
switch result {
case nil:
if !child.replicaInited {
return
}
if bc.broker.ID() != child.preferredReadReplica {

if result == nil {
if child.preferredReadReplica >= 0 && bc.broker.ID() != child.preferredReadReplica {
// not an error but needs redispatching to consume from prefered replica
child.trigger <- none{}
delete(bc.subscriptions, child)
}
continue
}

// Discard any replica preference.
child.preferredReadReplica = -1

switch result {
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
Expand All @@ -862,7 +866,6 @@ func (bc *brokerConsumer) handleResponses() {
// not an error, but does need redispatching
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.replicaInited = false
child.trigger <- none{}
delete(bc.subscriptions, child)
default:
Expand Down
215 changes: 211 additions & 4 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,13 +625,148 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) {
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
block1 := fetchResponse1.GetBlock("my_topic", 0)
block1.PreferredReadReplica = 1
block1.PreferredReadReplica = -1

fetchResponse2 := &FetchResponse{Version: 11}
// Create a block with no records.
block2 := fetchResponse1.getOrCreateBlock("my_topic", 0)
block2.PreferredReadReplica = 1

fetchResponse3 := &FetchResponse{Version: 11}
fetchResponse3.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse3.AddMessage("my_topic", 0, nil, testMsg, 4)
block3 := fetchResponse3.GetBlock("my_topic", 0)
block3.PreferredReadReplica = -1

fetchResponse4 := &FetchResponse{Version: 11}
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 5)
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 6)
block4 := fetchResponse4.GetBlock("my_topic", 0)
block4.PreferredReadReplica = -1

cfg := NewConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

leader := NewMockBroker(t, 0)
broker0 := NewMockBroker(t, 1)

leader.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
})

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse3, fetchResponse4),
})

master, err := NewConsumer([]string{broker0.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

assertMessageOffset(t, <-consumer.Messages(), 1)
assertMessageOffset(t, <-consumer.Messages(), 2)
assertMessageOffset(t, <-consumer.Messages(), 3)
assertMessageOffset(t, <-consumer.Messages(), 4)

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
leader.Close()
}

func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) {
// Given
fetchResponse1 := &FetchResponse{Version: 11}
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
block1 := fetchResponse1.GetBlock("my_topic", 0)
block1.PreferredReadReplica = 5 // Does not exist.

fetchResponse2 := &FetchResponse{Version: 11}
fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 4)
block2 := fetchResponse2.GetBlock("my_topic", 0)
block2.PreferredReadReplica = 1
block2.PreferredReadReplica = -1

cfg := NewConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

leader := NewMockBroker(t, 0)

leader.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
})

master, err := NewConsumer([]string{leader.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

assertMessageOffset(t, <-consumer.Messages(), 1)
assertMessageOffset(t, <-consumer.Messages(), 2)
assertMessageOffset(t, <-consumer.Messages(), 3)
assertMessageOffset(t, <-consumer.Messages(), 4)

safeClose(t, consumer)
safeClose(t, master)
leader.Close()
}

func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) {
// Given
fetchResponse1 := &FetchResponse{Version: 11}
block1 := fetchResponse1.getOrCreateBlock("my_topic", 0)
block1.PreferredReadReplica = 1

fetchResponse2 := &FetchResponse{Version: 11}
fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 1)
fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 2)
block2 := fetchResponse2.GetBlock("my_topic", 0)
block2.PreferredReadReplica = -1

fetchResponse3 := &FetchResponse{Version: 11}
fetchResponse3.AddError("my_topic", 0, ErrReplicaNotAvailable)

fetchResponse4 := &FetchResponse{Version: 11}
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4)

cfg := NewConfig()
cfg.Version = V2_3_0_0
Expand All @@ -649,7 +784,79 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) {
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse4),
})

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse2, fetchResponse3),
})

master, err := NewConsumer([]string{broker0.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

assertMessageOffset(t, <-consumer.Messages(), 1)
assertMessageOffset(t, <-consumer.Messages(), 2)
assertMessageOffset(t, <-consumer.Messages(), 3)
assertMessageOffset(t, <-consumer.Messages(), 4)

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
leader.Close()
}

func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) {
// Given
fetchResponse1 := &FetchResponse{Version: 11}
block1 := fetchResponse1.getOrCreateBlock("my_topic", 0)
block1.PreferredReadReplica = 1

fetchResponse2 := &FetchResponse{Version: 11}
fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 1)
fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 2)
block2 := fetchResponse2.GetBlock("my_topic", 0)
block2.PreferredReadReplica = -1

fetchResponse3 := &FetchResponse{Version: 11}
fetchResponse3.AddError("my_topic", 0, ErrUnknown)

fetchResponse4 := &FetchResponse{Version: 11}
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4)

cfg := NewConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

leader := NewMockBroker(t, 0)
broker0 := NewMockBroker(t, 1)

leader.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse4),
})

broker0.SetHandlerByMap(map[string]MockResponse{
Expand All @@ -661,7 +868,7 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) {
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse2),
"FetchRequest": NewMockSequence(fetchResponse2, fetchResponse3),
})

master, err := NewConsumer([]string{broker0.Addr()}, cfg)
Expand Down
4 changes: 2 additions & 2 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
// or otherwise failed to respond.
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")

// ErrBrokerNotFound is returned when there's no broker found for the requested id.
var ErrBrokerNotFound = errors.New("kafka: broker for id is not found")
// ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.
var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found")

// ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")
Expand Down
2 changes: 2 additions & 0 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
if err != nil {
return err
}
} else {
b.PreferredReadReplica = -1
}

recordsSize, err := pd.getInt32()
Expand Down
Loading

0 comments on commit f78211f

Please sign in to comment.