Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-392: Allow consumers to fetch from closest replica #1696

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Client interface {
// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker

// Broker returns the active Broker if available for the brokerID
Broker(brokerID int32) (*Broker, error)

// Topics returns the set of available topics as retrieved from cluster metadata.
Topics() ([]string, error)

Expand Down Expand Up @@ -196,6 +199,17 @@ func (client *client) Brokers() []*Broker {
return brokers
}

func (client *client) Broker(brokerID int32) (*Broker, error) {
client.lock.RLock()
defer client.lock.RUnlock()
broker, ok := client.brokers[brokerID]
if !ok {
return nil, ErrBrokerNotFound
}
_ = broker.Open(client.conf)
return broker, nil
}

func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
var err error
for broker := client.any(); broker != nil; broker = client.any() {
Expand Down
36 changes: 36 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,42 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
}
}

func TestClientGetBroker(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse1)

client, err := NewClient([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}

broker, err := client.Broker(leader.BrokerID())
if err != nil {
t.Fatal(err)
}

if broker.Addr() != leader.Addr() {
t.Errorf("Expected broker to have address %s, found %s", leader.Addr(), broker.Addr())
}

metadataResponse2 := new(MetadataResponse)
metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse2)

if err := client.RefreshMetadata(); err != nil {
t.Error(err)
}
broker, err = client.Broker(leader.BrokerID())
if err != ErrBrokerNotFound {
t.Errorf("Expected Broker(brokerID) to return %v found %v", ErrBrokerNotFound, err)
}
}

func TestClientResurrectDeadSeeds(t *testing.T) {
initialSeed := NewMockBroker(t, 0)
emptyMetadata := new(MetadataResponse)
Expand Down
44 changes: 33 additions & 11 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ type partitionConsumer struct {
errors chan *ConsumerError
feeder chan *FetchResponse

replicaInited bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initiated? what is this going to be used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rename it...its used to fetch from read replica instead of master.

preferredReadReplica int32

trigger, dying chan none
closeOnce sync.Once
topic string
Expand Down Expand Up @@ -359,21 +362,29 @@ func (child *partitionConsumer) dispatcher() {
close(child.feeder)
}

func (child *partitionConsumer) preferedBroker() (*Broker, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preferred, not prefered.

if child.replicaInited {
broker, err := child.consumer.client.Broker(child.preferredReadReplica)
if err == nil {
return broker, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the error not important? could we log it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not important at all..but could log it.

}

// if prefered replica cannot be found fallback to leader
return child.consumer.client.Leader(child.topic, child.partition)
}

func (child *partitionConsumer) dispatch() error {
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
return err
}

var leader *Broker
var node *Broker
var err error
if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
if node, err = child.preferedBroker(); err != nil {
return err
}

child.broker = child.consumer.refBrokerConsumer(leader)

child.broker = child.consumer.refBrokerConsumer(node)
child.broker.input <- child

return nil
}

Expand Down Expand Up @@ -445,7 +456,6 @@ func (child *partitionConsumer) responseFeeder() {
feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)

if child.responseResult == nil {
atomic.StoreInt32(&child.retries, 0)
}
Expand Down Expand Up @@ -480,7 +490,6 @@ feederLoop:
}
}
}

child.broker.acks.Done()
}

Expand Down Expand Up @@ -617,6 +626,12 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

if response.Version == 11 && len(child.consumer.conf.RackID) > 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I accidently removed my suggestion. Doing it again.

To support additional Fetch protocol changes, the comparison with version 11 should allow this.
change
response.Version == 11
to
response.Version >= 11

// we got a valid response with messages. update child's preferredReadReplica from the FetchResponseBlock
child.replicaInited = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this name is not descriptive enough; suggest readReplicaConfigured or similar.

child.preferredReadReplica = block.PreferredReadReplica
}

// abortedProducerIDs contains producerID which message should be ignored as uncommitted
// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
// - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
Expand Down Expand Up @@ -815,10 +830,16 @@ func (bc *brokerConsumer) handleResponses() {
for child := range bc.subscriptions {
result := child.responseResult
child.responseResult = nil

switch result {
case nil:
// no-op
if !child.replicaInited {
return
}
if bc.broker.ID() != child.preferredReadReplica {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would also do the same:

if child.replicaInited && bc.broker.ID() != child.preferredReadReplica

not need to do the return there

also the no-op message might be important to leave it, maybe with a modified comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. will do this

// not an error but needs redispatching to consume from prefered replica
child.trigger <- none{}
delete(bc.subscriptions, child)
}
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
Expand All @@ -834,6 +855,7 @@ func (bc *brokerConsumer) handleResponses() {
// not an error, but does need redispatching
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.replicaInited = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this needs a locking mechanism? seems unsafe
are we sure there are no multiple go routines accessing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked. it it's fine to be accessed like this.

child.trigger <- none{}
delete(bc.subscriptions, child)
default:
Expand Down
67 changes: 67 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,73 @@ func TestConsumeMessageWithSessionIDs(t *testing.T) {
}
}

func TestConsumeMessagesFromReadReplica(t *testing.T) {
// Given
fetchResponse1 := &FetchResponse{Version: 11}
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
block1 := fetchResponse1.GetBlock("my_topic", 0)
block1.PreferredReadReplica = 1

fetchResponse2 := &FetchResponse{Version: 11}
fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 4)
block2 := fetchResponse2.GetBlock("my_topic", 0)
block2.PreferredReadReplica = 1

cfg := NewConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

leader := NewMockBroker(t, 0)
broker0 := NewMockBroker(t, 1)

leader.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1),
})

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse2),
})

master, err := NewConsumer([]string{broker0.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

assertMessageOffset(t, <-consumer.Messages(), 1)
assertMessageOffset(t, <-consumer.Messages(), 2)
assertMessageOffset(t, <-consumer.Messages(), 3)
assertMessageOffset(t, <-consumer.Messages(), 4)

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
leader.Close()
}

// It is fine if offsets of fetched messages are not sequential (although
// strictly increasing!).
func TestConsumerNonSequentialOffsets(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
// or otherwise failed to respond.
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")

// ErrBrokerNotFound is returned when there's no broker found for the requested id.
var ErrBrokerNotFound = errors.New("kafka: broker for id is not found")

// ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")

Expand Down