Skip to content

Commit

Permalink
Allow consumer configuration to specify replica selector
Browse files Browse the repository at this point in the history
Rather than always using the leader partition, this lets the consumer
application specify a function to select which broker they wish to
consume from. The function has the signature

    fn(topic string, partition int32, client Client) (*Broker, error)

Allowing the client to implement their own logic for broker slection.
If none is specified, the default behavior is backwards compatible
and selects the leader.

This pulls some changes from IBM#1696, but in our use case we want to
spread the load across multiple replicas, not pick based on geographic
considerations. I didn't think our specific use case made sense for
the general library, but it also wasn't possible without changes to
Sarama, so I made a generalizable solution. It might be possible to
implement the changes for KIP-392 as a ReplicaSelector function,
though that may still require some additional code changes to support
communications regarding which broker should be used for a given rack.
  • Loading branch information
AusIV committed Jun 19, 2020
1 parent 6523153 commit a7760f7
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 8 deletions.
14 changes: 14 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Client interface {
// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker

// Broker returns the active Broker if available for the brokerID
Broker(brokerID int32) (*Broker, error)

// Topics returns the set of available topics as retrieved from cluster metadata.
Topics() ([]string, error)

Expand Down Expand Up @@ -196,6 +199,17 @@ func (client *client) Brokers() []*Broker {
return brokers
}

func (client *client) Broker(brokerID int32) (*Broker, error) {
client.lock.RLock()
defer client.lock.RUnlock()
broker, ok := client.brokers[brokerID]
if !ok {
return nil, ErrBrokerNotFound
}
_ = broker.Open(client.conf)
return broker, nil
}

func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
var err error
for broker := client.any(); broker != nil; broker = client.any() {
Expand Down
36 changes: 36 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,42 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
}
}

func TestClientGetBroker(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse1)

client, err := NewClient([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}

broker, err := client.Broker(leader.BrokerID())
if err != nil {
t.Fatal(err)
}

if broker.Addr() != leader.Addr() {
t.Errorf("Expected broker to have address %s, found %s", leader.Addr(), broker.Addr())
}

metadataResponse2 := new(MetadataResponse)
metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse2)

if err := client.RefreshMetadata(); err != nil {
t.Error(err)
}
broker, err = client.Broker(leader.BrokerID())
if err != ErrBrokerNotFound {
t.Errorf("Expected Broker(brokerID) to return %v found %v", ErrBrokerNotFound, err)
}
}

func TestClientResurrectDeadSeeds(t *testing.T) {
initialSeed := NewMockBroker(t, 0)
emptyMetadata := new(MetadataResponse)
Expand Down
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ type Config struct {
// - use `ReadUncommitted` (default) to consume and return all messages in message channel
// - use `ReadCommitted` to hide messages that are part of an aborted transaction
IsolationLevel IsolationLevel

// Called to select the consumer's replica dynamically. Defaults to the
// partition leader.
ReplicaSelector func(topic string, partition int32, client Client) (*Broker, error)
}

// A user-provided string sent with every request to the brokers for logging,
Expand Down
28 changes: 20 additions & 8 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,16 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
return nil, err
}

var leader *Broker
var replica *Broker
var err error
if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
return nil, err
if c.conf.Consumer.ReplicaSelector == nil {
if replica, err = c.client.Leader(child.topic, child.partition); err != nil {
return nil, err
}
} else {
if replica, err = c.conf.Consumer.ReplicaSelector(child.topic, child.partition, c.client); err != nil {
return nil, err
}
}

if err := c.addChild(child); err != nil {
Expand All @@ -157,7 +163,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
go withRecover(child.dispatcher)
go withRecover(child.responseFeeder)

child.broker = c.refBrokerConsumer(leader)
child.broker = c.refBrokerConsumer(replica)
child.broker.input <- child

return child, nil
Expand Down Expand Up @@ -364,13 +370,19 @@ func (child *partitionConsumer) dispatch() error {
return err
}

var leader *Broker
var replica *Broker
var err error
if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
return err
if child.conf.Consumer.ReplicaSelector == nil {
if replica, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
return err
}
} else {
if replica, err = child.conf.Consumer.ReplicaSelector(child.topic, child.partition, child.consumer.client); err != nil {
return err
}
}

child.broker = child.consumer.refBrokerConsumer(leader)
child.broker = child.consumer.refBrokerConsumer(replica)

child.broker.input <- child

Expand Down
48 changes: 48 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,54 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
broker0.Close()
}

func TestConsumerSelector(t *testing.T) {
broker0 := NewMockBroker(t, 0)
broker1 := NewMockBroker(t, 1)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 10).
SetOffset("my_topic", 0, OffsetOldest, 7),
"FetchRequest": NewMockFetchResponse(t, 1).
SetMessage("my_topic", 0, 9, testMsg).
SetMessage("my_topic", 0, 10, testMsg).
SetMessage("my_topic", 0, 11, testMsg).
SetHighWaterMark("my_topic", 0, 14),
})
broker1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker1.Addr(), broker1.BrokerID()).
SetLeader("my_topic", 0, broker1.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 10).
SetOffset("my_topic", 0, OffsetOldest, 7),
"FetchRequest": NewMockFetchResponse(t, 1).
SetMessage("my_topic", 0, 9, testMsg).
SetMessage("my_topic", 0, 10, testMsg).
SetMessage("my_topic", 0, 11, testMsg).
SetHighWaterMark("my_topic", 0, 14),
})

cfg := NewConfig()
cfg.Consumer.ReplicaSelector = func(topic string, partition int32, client Client) (*Broker, error) {
return client.Broker(1)
}
master, err := NewConsumer([]string{broker0.Addr(),broker1.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
if err != nil {
t.Fatal(err)
}

if id := consumer.(*partitionConsumer).broker.broker.id; id != 1 {t.Fatalf("Unexpected broker id %v", id)}
}

func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
// Given
fetchResponse1 := &FetchResponse{Version: 4}
Expand Down
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
// or otherwise failed to respond.
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")

// ErrBrokerNotFound is returned when there's no broker found for the requested id.
var ErrBrokerNotFound = errors.New("kafka: broker for id is not found")

// ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")

Expand Down

0 comments on commit a7760f7

Please sign in to comment.