-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP-392: Allow consumers to fetch from closest replica #1696
Conversation
@dneralla thanks for contributing! I've taken a first pass over the changes and they look good. Please can you sign the CLA? Have you tried them out against a real 2.4/2.5 cluster with rack aware support enabled? |
@dnwe Signed the CLA. Not sure how this CI is integrated it seems to be not running again. |
Thanks for contributing 🙏 I've re-triggered CI. |
Do you think you could rebase your branch off of latest master please? |
@@ -359,21 +362,29 @@ func (child *partitionConsumer) dispatcher() { | |||
close(child.feeder) | |||
} | |||
|
|||
func (child *partitionConsumer) preferedBroker() (*Broker, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
preferred, not prefered.
@@ -617,6 +626,12 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu | |||
child.fetchSize = child.conf.Consumer.Fetch.Default | |||
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) | |||
|
|||
if response.Version == 11 && len(child.consumer.conf.RackID) > 0 { | |||
// we got a valid response with messages. update child's preferredReadReplica from the FetchResponseBlock | |||
child.replicaInited = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this name is not descriptive enough; suggest readReplicaConfigured
or similar.
@@ -299,6 +299,9 @@ type partitionConsumer struct { | |||
errors chan *ConsumerError | |||
feeder chan *FetchResponse | |||
|
|||
replicaInited bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initiated? what is this going to be used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will rename it...its used to fetch from read replica instead of master.
@@ -834,6 +855,7 @@ func (bc *brokerConsumer) handleResponses() { | |||
// not an error, but does need redispatching | |||
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", | |||
bc.broker.ID(), child.topic, child.partition, result) | |||
child.replicaInited = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this needs a locking mechanism? seems unsafe
are we sure there are no multiple go routines accessing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked. it it's fine to be accessed like this.
if !child.replicaInited { | ||
return | ||
} | ||
if bc.broker.ID() != child.preferredReadReplica { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would also do the same:
if child.replicaInited && bc.broker.ID() != child.preferredReadReplica
not need to do the return there
also the no-op
message might be important to leave it, maybe with a modified comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. will do this
broker, err := child.consumer.client.Broker(child.preferredReadReplica) | ||
if err == nil { | ||
return broker, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the error not important? could we log it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not important at all..but could log it.
@dneralla did you get around to testing your changes against a 2.4/2.5 cluster yet? |
The positive test case, to verify the feature, though, the replica.selector.class broker configuration must be set to the fully qualified class name that implements ReplicaSelector. This configuration item should be set to org.apache.kafka.common.replica.RackAwareReplicaSelector |
What's the ETA towards merge? |
@marc-ostrow still waiting for some confirmation that this has been tested against a properly configured cluster and shown to work before we can merge the feature. I haven’t had time to do this testing myself sadly |
I haven't found time to test with new kafka cluster as well sadly. Will try to see if I can setup this weekend. |
I have a small 3-az MSK cluster running Kafka 2.4.1. On May 20th, I realized I needed to set the cluster configuration to include replica.selector.class set to org.apache.kafka.common.replica.RackAwareReplicaSelector so I did that. Today, I cloned https://github.com/dneralla/sarama.git and added a little additional log output to the kip-392 branch. I then wrote a small test driver app in ./examples/kip392verify. I ran it on a utility instance for that MSK cluster.
My consumer started out on the leader broker and stayed there. FetchResponseBlock.decode was receiving PreferredReadReplica set to -1. Yes, I verified the FetchRequest had my rack ID (use1-az1) too. ;-( I searched Cloudwatch broker logs to see what the cluster configuration was set to. The following snippet confirmed to me that I had successfully updated the cluster configuration; this from the broker log just after the restart. Notice the values for replica.selector.class and broker.rack. CloudWatch Logs Insights
Click to expand
|
@marc-ostrow Did u set the rack id in the consumer config ? |
@dneralla Yes, I did. Here's some initial log output showing the brokers, their rack id's, the Leader broker's rack ID, and the rack id in the encode and the preferred replica in the decode of fetch request/response.
|
@dneralla Hi, so I also tried running a docker network cluster of three ZK, and three Kafka nodes using confluentinc/cp-kafka:5.4.1 which I believe incorporates Kafka 2.4.1. I actually modified the fetch handler to log the client metadata so I could verify we were indeed sending the rackId; and we are. I cloned the confluentinc/kafka repo and checked out the 5.4.1 tag, made my mods, and built an unsigned tarball. I extracted kafka_2.12-5.4.1-ccs/libs/kafka_2.12-5.4.1-ccs.jar from the kafka_2.12-5.4.1-ccs.tgz tarball. I then built a new image from the confluentinc/cp-kafka:5.4.1 base image rewriting the VOLUME and CMD dockerfile commands to match the cp-docker-images dockerfile. FROM confluentinc/cp-kafka:5.4.1
COPY kafka_2.12-5.4.1-ccs.jar /usr/share/java/kafka/
VOLUME ["/var/lib/kafka/data", "/etc/kafka/secrets"]
CMD ["/etc/confluent/docker/run"] My diffs were as follows diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 124b2e356..6018303a3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -609,6 +609,10 @@ class KafkaApis(val requestChannel: RequestChannel,
None
}
+ if (versionId >= 11) {
+ info("handleFetchRequest: clientMetadata is " + clientMetadata.getOrElse("not set"))
+ }
+
def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = {
new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) Btw, I highly suggest we modify the pull request and change any protocol version checks against version 11 api, to be true for any version greater than or equal to 11. I ran my docker network following the confluent.io clustered-deployment example as follows: Click to expandcreate-zoo-cluster #!/bin/sh
docker run -d \
--rm \
--net=host \
--name=zk-1 \
-e ZOOKEEPER_SERVER_ID=1 \
-e ZOOKEEPER_CLIENT_PORT=22181 \
-e ZOOKEEPER_TICK_TIME=2000 \
-e ZOOKEEPER_INIT_LIMIT=5 \
-e ZOOKEEPER_SYNC_LIMIT=2 \
-e ZOOKEEPER_SERVERS="localhost:22888:23888;localhost:32888:33888;localhost:42888:43888" \
-e KAFKA_OPTS="-Dzookeeper.4lw.commands.whitelist=*" \
confluentinc/cp-zookeeper:5.4.1
docker run -d \
--rm \
--net=host \
--name=zk-2 \
-e ZOOKEEPER_SERVER_ID=2 \
-e ZOOKEEPER_CLIENT_PORT=32181 \
-e ZOOKEEPER_TICK_TIME=2000 \
-e ZOOKEEPER_INIT_LIMIT=5 \
-e ZOOKEEPER_SYNC_LIMIT=2 \
-e ZOOKEEPER_SERVERS="localhost:22888:23888;localhost:32888:33888;localhost:42888:43888" \
-e KAFKA_OPTS="-Dzookeeper.4lw.commands.whitelist=*" \
confluentinc/cp-zookeeper:5.4.1
docker run -d \
--rm \
--net=host \
--name=zk-3 \
-e ZOOKEEPER_SERVER_ID=3 \
-e ZOOKEEPER_CLIENT_PORT=42181 \
-e ZOOKEEPER_TICK_TIME=2000 \
-e ZOOKEEPER_INIT_LIMIT=5 \
-e ZOOKEEPER_SYNC_LIMIT=2 \
-e ZOOKEEPER_SERVERS="localhost:22888:23888;localhost:32888:33888;localhost:42888:43888" \
-e KAFKA_OPTS="-Dzookeeper.4lw.commands.whitelist=*" \
confluentinc/cp-zookeeper:5.4.1 check-zoo-cluster #!/bin/sh
for i in 22181 32181 42181; do
docker run --net=host --rm confluentinc/cp-zookeeper:5.4.1 bash -c "echo stat | nc localhost $i | grep Mode"
done create-kafka-cluster #!/bin/sh
docker run -d \
--rm \
--net=host \
--name=kafka-1 \
-e KAFKA_ZOOKEEPER_CONNECT=localhost:22181,localhost:32181,localhost:42181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_BROKER_RACK=Rack-1 \
-e KAFKA_REPLICA_SELECTOR_CLASS=org.apache.kafka.common.replica.RackAwareReplicaSelector \
local/kafka
# confluentinc/cp-kafka:5.4.1
docker run -d \
--rm \
--net=host \
--name=kafka-2 \
-e KAFKA_ZOOKEEPER_CONNECT=localhost:22181,localhost:32181,localhost:42181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:39092 \
-e KAFKA_BROKER_ID=2 \
-e KAFKA_BROKER_RACK=Rack-2 \
-e KAFKA_REPLICA_SELECTOR_CLASS=org.apache.kafka.common.replica.RackAwareReplicaSelector \
local/kafka
# confluentinc/cp-kafka:5.4.1
docker run -d \
--rm \
--net=host \
--name=kafka-3 \
-e KAFKA_ZOOKEEPER_CONNECT=localhost:22181,localhost:32181,localhost:42181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:49092 \
-e KAFKA_BROKER_ID=3 \
-e KAFKA_BROKER_RACK=Rack-3 \
-e KAFKA_REPLICA_SELECTOR_CLASS=org.apache.kafka.common.replica.RackAwareReplicaSelector \
local/kafka
# confluentinc/cp-kafka:5.4.1 create-topic #!/bin/sh
docker run \
--rm \
--net=host \
confluentinc/cp-kafka:5.4.1 kafka-topics \
--bootstrap-server localhost:29092,localhost:39092,localhost:49092 \
--create --topic bar --partitions 1 --replication-factor 3
docker run \
--rm \
--net=host \
confluentinc/cp-kafka:5.4.1 kafka-topics \
--bootstrap-server localhost:29092,localhost:39092,localhost:49092 \
--describe --topic bar With each broker having a broker.id, broker.rack and replica.selector.class configuration setting configured, and a topic with no activity, I finally saw the FetchResponse come back with the expected broker.id of the replica broker having a matching RackID as my client. Btw, I further modified my instrumentation as follows to reduce log output: diff --git a/fetch_request.go b/fetch_request.go
index cffc339..9802ae2 100644
--- a/fetch_request.go
+++ b/fetch_request.go
@@ -65,6 +65,8 @@ const (
ReadCommitted
)
+var encodeCount = uint64(0)
+
func (r *FetchRequest) encode(pe packetEncoder) (err error) {
pe.putInt32(-1) // replica ID is always -1 for clients
pe.putInt32(r.MaxWaitTime)
@@ -124,7 +126,10 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
if err != nil {
return err
}
- Logger.Printf("encode version %d, RackID %s\n", r.Version, r.RackID)
+ if encodeCount%100 == 0 {
+ Logger.Printf("encode version %d, RackID %s\n", r.Version, r.RackID)
+ }
+ encodeCount++
}
return nil
diff --git a/fetch_response.go b/fetch_response.go
index a60e948..291ed9f 100644
--- a/fetch_response.go
+++ b/fetch_response.go
@@ -41,6 +41,8 @@ type FetchResponseBlock struct {
Partial bool
}
+var decodeCount = uint64(0)
+
func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
tmp, err := pd.getInt16()
if err != nil {
@@ -89,7 +91,10 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
if err != nil {
return err
}
- Logger.Printf("decoded version %d, PreferredReadReplica %d\n", version, b.PreferredReadReplica)
+ if decodeCount%100 == 0 {
+ Logger.Printf("decoded version %d, PreferredReadReplica %d\n", version, b.PreferredReadReplica)
+ }
+ decodeCount++
}
recordsSize, err := pd.getInt32() I experienced the following:
Sorry... will have to try again next weekend if I have time. |
Rather than always using the leader partition, this lets the consumer application specify a function to select which broker they wish to consume from. The function has the signature fn(topic string, partition int32, client Client) (*Broker, error) Allowing the client to implement their own logic for broker slection. If none is specified, the default behavior is backwards compatible and selects the leader. This pulls some changes from IBM#1696, but in our use case we want to spread the load across multiple replicas, not pick based on geographic considerations. I didn't think our specific use case made sense for the general library, but it also wasn't possible without changes to Sarama, so I made a generalizable solution. It might be possible to implement the changes for KIP-392 as a ReplicaSelector function, though that may still require some additional code changes to support communications regarding which broker should be used for a given rack.
@@ -617,6 +626,12 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu | |||
child.fetchSize = child.conf.Consumer.Fetch.Default | |||
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) | |||
|
|||
if response.Version == 11 && len(child.consumer.conf.RackID) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I accidently removed my suggestion. Doing it again.
To support additional Fetch protocol changes, the comparison with version 11 should allow this.
change
response.Version == 11
to
response.Version >= 11
Our major concern with cross-az traffic is with bringing in upwards to 7000 mpeg transport streams. Not really an idiomatic use of Kafka, but it works using single partition topics, one for each stream. If we had this feature, assuming the cluster was performing appropriately, we wouldn't have to worry so much about the cost of cross-az traffic with our consumers of these streams. However, we would still have to watch the leader brokers per partition/topic, and move the producers into the same AZ to eliminate cross-az traffic. We've basically opted to go the route of gRPC for these streams which means all consumers will have to operate within the same AZ as their gRPC server to reduce cross-az traffic. BUT, has anyone gotten this pull request to work, and verify the bulk of traffic is between a client and its nearest replica (that being a replica in its rack)? |
Anyone get this working? |
Hello, |
@angeloskaltsikis i believe it’s still waiting for someone to test the changes and confirm they function as expected by analysing the traffic. The earlier comments seemed to suggest it wasn’t working as it should do we wouldn’t want to merge and advertise the functionality until it was |
Address comments on IBM#1696. Change to only adding and using a new preferredReadReplica field. Move setting of the preferredReadReplica field up in parseResponse. This causes the returned preferred read replica to be used even if there are no records in the fetch response. Set FetchResponse.PreferredReadReplica to -1 when decoding versions prior to 11. Add more tests.
Address comments on IBM#1696. Change to only adding and using a new preferredReadReplica field. Move setting of the preferredReadReplica field up in parseResponse. This causes the returned preferred read replica to be used even if there are no records in the fetch response. Set FetchResponse.PreferredReadReplica to -1 when decoding versions prior to 11. Add more tests.
Address comments on IBM#1696. Change to only adding and using a new preferredReadReplica field. Move setting of the preferredReadReplica field up in parseResponse. This causes the returned preferred read replica to be used even if there are no records in the fetch response. Set FetchResponse.PreferredReadReplica to -1 when decoding versions prior to 11. Add more tests.
Address comments on IBM#1696. Change to only adding and using a new preferredReadReplica field. Move setting of the preferredReadReplica field up in parseResponse. This causes the returned preferred read replica to be used even if there are no records in the fetch response. Set FetchResponse.PreferredReadReplica to -1 when decoding versions prior to 11. Add more tests. Change consumeMsgs in functional tests to use subtests to reduce number of outstanding connections.
Hi there, I spent some time validating this and, hopefully, improving it. Hope you don't mind me taking it a bit further, @dneralla! My branch is here. These are the main changes from this PR:
I also set up a test rig at https://github.com/danp/sarama-rack. It let me send some messages to a cluster in docker-compose and see things working. Maybe that will help others validate this as well. Feel free to integrate my changes however you like, or let me know if there's anything else I can do. |
Any ETA on this merge? One thing to note - without this change, if you make the mistake of setting the RackId on the consumer client config with a rack aware kafka cluster, you may or may not consume messages. Luck of the draw |
Hey, This is no longer my priority and I am not finding time to get to
this...can someone take this PR over ?
…On Wed, Oct 28, 2020 at 7:27 AM salsorrentino ***@***.***> wrote:
Any ETA on this merge? One thing to note - *without this change*, if you
make the mistake of setting the RackId on the consumer client config with a
rack aware kafka cluster, you may or may not consume messages. Luck of the
draw
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1696 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAQW5OPT3KDYAHH6L6ZQTR3SNAS4DANCNFSM4M5Q4LWQ>
.
|
After some very basic testing with (sarama-rack)[https://github.com/danp/sarama-rack] some weeks ago, it seemed that the fork of @danp worked as expected. Can someone else try the changes? It's very easy to setup with docker. Any certain tests you suggest that we run @dnwe ? The thing i believe that most people will agree with, is that as soon as this feature is tested and released in sarama it is going to save a LOT of Inter-AZ/Cross-AZ Traffic Costs to a lot of applications using sarama as the base library. |
@danp could you please submit a PR with changes you've implemented on top of this one? |
Sure, opened #1822. |
Superceded by #1822 |
Support for [KIP-392](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica) in Sarama consumers. This is a continuation of #1696 with changes mentioned [here](#1696 (comment)). Co-authored-by: Deepak.Neralla <[email protected]>
KIP-392: Allow consumers to fetch from closest replica