Skip to content
This repository has been archived by the owner on Nov 29, 2024. It is now read-only.

Extended consumer latency recorded after partition leadership reverts to preferred #204

Closed
k-wall opened this issue Nov 23, 2022 · 12 comments · Fixed by #208
Closed

Extended consumer latency recorded after partition leadership reverts to preferred #204

k-wall opened this issue Nov 23, 2022 · 12 comments · Fixed by #208

Comments

@k-wall
Copy link
Contributor

k-wall commented Nov 23, 2022

Using strimzi-canary 0.5.0 against a three broker instance, I am noticing that sometimes I see consumer latencies being unexpected high for a period. The problem remains for about 10mins until it resolves, without any intervention.

Here's sequence of events the causes the situation:

  1. Canary running against cluster recording short produce/consume latencies against all three partitions.
  2. Restart one broker (broker=2)
  3. Partition leadership moves from broker 2 to either broker 0 or 1
  4. Canary continues to record short produce/consume latencies against all partitions.
  5. After a period the cluster reverts the partition leadership back to its prefered broker (broker config: auto.leader.rebalance.enable = true, leader.imbalance.check.interval.seconds = 300, leader.imbalance.per.broker.percentage = 0)
  6. Canary continues to record short produce latencies, but the consume latency for the partition whose leadership is reverted is considerably extended (unexpected). The extended consume latencies continue for many minutes.
  7. After about 10 minutes of extended consume latencies for that partition, the problem disappears.

The problem is impactful because it leads to seemingly spurious latency alerts.

Here's what the logs look like:

At 1. (normal operation)

Topic: __redhat_strimzi_canary	TopicId: ixJjbxxYTTi_EiWQwmZFmA	PartitionCount: 3	ReplicationFactor: 3	Configs: min.insync.replicas=2,segment.bytes=16384,retention.ms=600000,message.format.version=3.0-IV1,max.message.bytes=1048588
	Topic: __redhat_strimzi_canary	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 1,0,2
	Topic: __redhat_strimzi_canary	Partition: 1	Leader: 1	Replicas: 1,2,0	Isr: 1,0,2
	Topic: __redhat_strimzi_canary	Partition: 2	Leader: 2	Replicas: 2,0,1	Isr: 1,0,2
I1123 14:34:24.198305       1 producer.go:108] Message sent: partition=0, offset=16134, duration=3 ms
I1123 14:34:24.201262       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2012, Timestamp:1669214064198}, partition=1, offset=16135, duration=3 ms
I1123 14:34:24.201318       1 producer.go:108] Message sent: partition=1, offset=16135, duration=3 ms
I1123 14:34:24.204505       1 producer.go:108] Message sent: partition=2, offset=16137, duration=3 ms
I1123 14:34:24.204506       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2013, Timestamp:1669214064201}, partition=2, offset=16137, duration=3 ms
I1123 14:34:29.198862       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2014, Timestamp:1669214069195}, partition=0, offset=16135, duration=3 ms
I1123 14:34:29.199983       1 producer.go:108] Message sent: partition=0, offset=16135, duration=4 ms
I1123 14:34:29.204179       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2015, Timestamp:1669214069199}, partition=1, offset=16136, duration=5 ms
I1123 14:34:29.204286       1 producer.go:108] Message sent: partition=1, offset=16136, duration=5 ms
I1123 14:34:29.207707       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2016, Timestamp:1669214069204}, partition=2, offset=16138, duration=3 ms
I1123 14:34:29.207992       1 producer.go:108] Message sent: partition=2, offset=16138, duration=3 ms
I1123 14:34:34.199204       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2017, Timestamp:1669214074196}, partition=0, offset=16136, duration=3 ms

At 2./3. (one broker restarted - normal operation from two brokers)

Topic: __redhat_strimzi_canary	TopicId: ixJjbxxYTTi_EiWQwmZFmA	PartitionCount: 3	ReplicationFactor: 3	Configs: min.insync.replicas=2,segment.bytes=16384,retention.ms=600000,message.format.version=3.0-IV1,max.message.bytes=1048588
	Topic: __redhat_strimzi_canary	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 1,0,2
	Topic: __redhat_strimzi_canary	Partition: 1	Leader: 1	Replicas: 1,2,0	Isr: 1,0,2
	Topic: __redhat_strimzi_canary	Partition: 2	**Leader: 0**	Replicas: 2,0,1	Isr: 1,0,2
I1123 14:34:59.198831       1 producer.go:108] Message sent: partition=0, offset=16139, duration=3 ms
I1123 14:34:59.201451       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2030, Timestamp:1669214099198}, partition=1, offset=16140, duration=3 ms
I1123 14:34:59.201517       1 producer.go:108] Message sent: partition=1, offset=16140, duration=3 ms
I1123 14:34:59.204108       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2031, Timestamp:1669214099201}, partition=2, offset=16142, duration=3 ms
I1123 14:34:59.204126       1 producer.go:108] Message sent: partition=2, offset=16142, duration=3 ms
I1123 14:35:04.198943       1 producer.go:108] Message sent: partition=0, offset=16140, duration=3 ms
I1123 14:35:04.198943       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2032, Timestamp:1669214104195}, partition=0, offset=16140, duration=3 ms
I1123 14:35:04.201458       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2033, Timestamp:1669214104198}, partition=1, offset=16141, duration=3 ms
I1123 14:35:04.201563       1 producer.go:108] Message sent: partition=1, offset=16141, duration=3 ms
I1123 14:35:04.204153       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2034, Timestamp:1669214104201}, partition=2, offset=16143, duration=3 ms
I1123 14:35:04.204224       1 producer.go:108] Message sent: partition=2, offset=16143, duration=3 ms

At 5./6. ( partition leadership reverts... produce latencies normal .. consume latencies for partition 2 extended)

Topic: __redhat_strimzi_canary	TopicId: ixJjbxxYTTi_EiWQwmZFmA	PartitionCount: 3	ReplicationFactor: 3	Configs: min.insync.replicas=2,segment.bytes=16384,retention.ms=600000,message.format.version=3.0-IV1,max.message.bytes=1048588
	Topic: __redhat_strimzi_canary	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 1,0,2
	Topic: __redhat_strimzi_canary	Partition: 1	Leader: 1	Replicas: 1,2,0	Isr: 1,0,2
	Topic: __redhat_strimzi_canary	Partition: 2	Leader: 2	Replicas: 2,0,1	Isr: 1,0,2
I1123 14:39:14.198053       1 producer.go:108] Message sent: partition=0, offset=16188, duration=3 ms
I1123 14:39:14.201396       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2180, Timestamp:1669214354198}, partition=1, offset=16189, duration=3 ms
I1123 14:39:14.201512       1 producer.go:108] Message sent: partition=1, offset=16189, duration=3 ms
I1123 14:39:14.207090       1 producer.go:108] Message sent: partition=2, offset=16191, duration=6 ms
I1123 14:39:15.200454       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2181, Timestamp:1669214354201}, partition=2, offset=16191, duration=**999** ms
I1123 14:39:19.198376       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2182, Timestamp:1669214359195}, partition=0, offset=16189, duration=3 ms
I1123 14:39:19.198534       1 producer.go:108] Message sent: partition=0, offset=16189, duration=3 ms
I1123 14:39:19.207277       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2183, Timestamp:1669214359198}, partition=1, offset=16190, duration=9 ms
I1123 14:39:19.207381       1 producer.go:108] Message sent: partition=1, offset=16190, duration=9 ms
I1123 14:39:19.217006       1 producer.go:108] Message sent: partition=2, offset=16192, duration=9 ms
I1123 14:39:20.202750       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2184, Timestamp:1669214359207}, partition=2, offset=16192, duration=**995** ms
I1123 14:39:24.224732       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2185, Timestamp:1669214364218}, partition=0, offset=16190, duration=6 ms
I1123 14:39:24.224776       1 producer.go:108] Message sent: partition=0, offset=16190, duration=6 ms
I1123 14:39:24.227775       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2186, Timestamp:1669214364224}, partition=1, offset=16191, duration=3 ms
I1123 14:39:24.228010       1 producer.go:108] Message sent: partition=1, offset=16191, duration=4 ms
I1123 14:39:24.232457       1 producer.go:108] Message sent: partition=2, offset=16193, duration=4 ms
I1123 14:39:25.228171       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2187, Timestamp:1669214364228}, partition=2, offset=16193, duration=**1000** ms
I1123 14:39:29.199843       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2188, Timestamp:1669214369196}, partition=0, offset=16191, duration=3 ms
I1123 14:39:29.200046       1 producer.go:108] Message sent: partition=0, offset=16191, duration=4 ms
I1123 14:39:29.203503       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2189, Timestamp:1669214369200}, partition=1, offset=16192, duration=3 ms
I1123 14:39:29.203540       1 producer.go:108] Message sent: partition=1, offset=16192, duration=3 ms
I1123 14:39:29.208611       1 producer.go:108] Message sent: partition=2, offset=16194, duration=5 ms
I1123 14:39:30.203502       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2190, Timestamp:1669214369203}, partition=2, offset=16194, duration=**1000** ms
I1123 14:39:34.198637       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2191, Timestamp:1669214374195}, partition=0, offset=16192, duration=3 ms

At 7. (after about 10mins, the extended consume latency from partition 2 disappear).

I1123 14:48:04.198608       1 producer.go:108] Message sent: partition=0, offset=16294, duration=3 ms
I1123 14:48:04.201637       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2498, Timestamp:1669214884198}, partition=1, offset=16295, duration=3 ms
I1123 14:48:04.201769       1 producer.go:108] Message sent: partition=1, offset=16295, duration=3 ms
I1123 14:48:04.205099       1 producer.go:108] Message sent: partition=2, offset=16297, duration=4 ms
I1123 14:48:05.202581       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2499, Timestamp:1669214884201}, partition=2, offset=16297, duration=**1001** ms
I1123 14:48:09.198154       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2500, Timestamp:1669214889195}, partition=0, offset=16295, duration=3 ms
I1123 14:48:09.198207       1 producer.go:108] Message sent: partition=0, offset=16295, duration=3 ms
I1123 14:48:09.201379       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2501, Timestamp:1669214889198}, partition=1, offset=16296, duration=3 ms
I1123 14:48:09.201441       1 producer.go:108] Message sent: partition=1, offset=16296, duration=3 ms
I1123 14:48:09.204801       1 producer.go:108] Message sent: partition=2, offset=16298, duration=3 ms
I1123 14:48:11.811706       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2502, Timestamp:1669214889201}, partition=2, offset=16298, duration=**2610** ms
I1123 14:48:14.198315       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2503, Timestamp:1669214894195}, partition=0, offset=16296, duration=3 ms
I1123 14:48:14.198327       1 producer.go:108] Message sent: partition=0, offset=16296, duration=3 ms
I1123 14:48:14.201631       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2504, Timestamp:1669214894198}, partition=1, offset=16297, duration=3 ms
I1123 14:48:14.201789       1 producer.go:108] Message sent: partition=1, offset=16297, duration=3 ms
I1123 14:48:14.205307       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2505, Timestamp:1669214894201}, partition=2, offset=16299, duration=**4** ms
I1123 14:48:14.205375       1 producer.go:108] Message sent: partition=2, offset=16299, duration=4 ms
I1123 14:48:19.199372       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2506, Timestamp:1669214899196}, partition=0, offset=16297, duration=3 ms
I1123 14:48:19.199592       1 producer.go:108] Message sent: partition=0, offset=16297, duration=3 ms
I1123 14:48:19.202871       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2507, Timestamp:1669214899199}, partition=1, offset=16298, duration=3 ms
I1123 14:48:19.203010       1 producer.go:108] Message sent: partition=1, offset=16298, duration=4 ms
I1123 14:48:19.206988       1 producer.go:108] Message sent: partition=2, offset=16300, duration=3 ms
I1123 14:48:19.207043       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2508, Timestamp:1669214899203}, partition=2, offset=16300, duration=**4** ms
I1123 14:48:24.198732       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2509, Timestamp:1669214904195}, partition=0, offset=16298, duration=3 ms
I1123 14:48:24.198801       1 producer.go:108] Message sent: partition=0, offset=16298, duration=3 ms
I1123 14:48:24.201927       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2510, Timestamp:1669214904198}, partition=1, offset=16299, duration=3 ms
I1123 14:48:24.201980       1 producer.go:108] Message sent: partition=1, offset=16299, duration=3 ms
I1123 14:48:24.205481       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2511, Timestamp:1669214904201}, partition=2, offset=16301, duration=4 ms
I1123 14:48:24.205519       1 producer.go:108] Message sent: partition=2, offset=16301, duration=4 ms
I1123 14:48:29.198294       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2512, Timestamp:1669214909195}, partition=0, offset=16299, duration=3 ms

And here's what it looks like in Prometheus:

Screenshot 2022-11-25 at 11 34 23

@k-wall
Copy link
Contributor Author

k-wall commented Nov 23, 2022

I notice by examining a RPC trace from the client, that during the period 5.6. the consumer is fetching from a follower. I can tell this by logging where the client is sending the FetchRequest RPCs. I see all the requests going to broker 0 (fetch blocks for partition 0 and 2) and broker 1 (fetch blocks for partition 1), but none to broker 2 during this period. As soon as period 7. starts, I see FetchRequests going to all three brokers again (with single fetch blocks).

I also see the following in the logs at the moment the problem resolves.

[Sarama] 2022/11/23 16:07:47.472933 consumer/broker/0 abandoned in favor of preferred replica broker/2

The reason the client makes this decision is the fact that the metadata has been refreshed. In Sarama this is controlled by config.Metadata.RefreshFrequency which defaults to 10 * time.Minute. The canary is using the default.

It surprises me that consuming from a follower introduces almost a 1 second of latency.

The client's interactions with a single broker are single threaded so I am wondering if this is giving rise to additional latency. The client is sending a single FetchRequest for partitions 0 and 2, the broker returns the FetchResponse containing only the record for partition 0. The client won't send out the FetchRequest (for partition 0 and 2 again) until the processing of the partition 0 response is complete. I think this might explain some of the additional latency, but not all.

@k-wall
Copy link
Contributor Author

k-wall commented Nov 23, 2022

I am wondering about:

  • the consumer client should refresh metadata more frequently. Maybe once a minute?
  • i wonder if the canary should be changed: a) use a single consuming client per partition so there can never be contention when the partition leaders are on the same broker. b) don't use consume groups at all.

@ppatierno WDYT

@showuon
Copy link
Member

showuon commented Nov 24, 2022

I believe this is the Kafka's bug. I think the fetch to the wrong follower will get "NOT_LEADER_OR_FOLLOWER", but it didn't fallback to leader when receiving this error.
Recently, someone reported it to upstream, but haven't got PR opened, yet. I'll ask if it can be fixed in v3.4.0.

https://issues.apache.org/jira/browse/KAFKA-14379

@showuon
Copy link
Member

showuon commented Nov 24, 2022

OK, I found I was wrong, because canary uses sarama client, not java client. So maybe they have similar issues there.

@k-wall
Copy link
Contributor Author

k-wall commented Nov 24, 2022

thanks @showuon. In the case of the canary the client is Sarama and I don't think it is suffering the same client defect.

In this case, the problem is resolved by the refreshing of the metadata (so the client is respecting the prefered read replica). The client logs:

[Sarama] 2022/11/23 16:07:47.472933 consumer/broker/0 abandoned in favor of preferred replica broker/2

just after the timed metadata refresh (10mins). That's coming from: https://github.com/Shopify/sarama/blob/v1.35.0/consumer.go#L977

I think fixing the canary to refresh is client metadata more frequently is the correct thing to do.

Separately, I am still puzzled why fetching from a follower introduces so much latency in this case (~1000ms). The canary's producer is using RequiredAcks WaitForAll (-1) when publishing. I know that all replicas are in-sync so this should mean that all brokers have the message. The canary is using a SyncProducer and the code is timing the latency between ProduceRequest and ProduceResponse. I know this is short ~5ms. I cannot account for ~990ms.

I1123 14:39:14.207090       1 producer.go:108] Message sent: partition=2, offset=16191, duration=6 ms
I1123 14:39:15.200454       1 consumer.go:220] Message received: value={ProducerID:__redhat_strimzi_canary_client, MessageID:2181, Timestamp:1669214354201}, partition=2, offset=16191, duration=**999** ms

@k-wall
Copy link
Contributor Author

k-wall commented Nov 24, 2022

Separately, I am still puzzled why fetching from a follower introduces so much latency in this case (~1000ms).

I believe I have answered my own question. What I am seeing is a consequence of how high water mark propagation behaves in partitions with sparse traffic. The leader informs the followers of the high water mark via the HighwaterMarkOffset in the FetchResponse only once it knows all in-sync followers have confirmed the receipt (by the offset in their FetchRequest). As there are no other messages going to the partition, the follower is not getting the FetchResponse for a full replica.fetch.wait.max.ms (default: 500ms) has elapsed. I experimented by lowering replica.fetch.wait.max.ms to confirm this was the case.

@showuon
Copy link
Member

showuon commented Nov 25, 2022

@k-wall , nice investigation! I think replica.fetch.wait.max.ms is indeed the reason for the ~1000ms latency.

k-wall added a commit to k-wall/strimzi-canary that referenced this issue Nov 25, 2022
… partition leadership changes.

This minimises the time the canary spends consumer from followers after the partition leadership changes on the broker, and so avoid end
to end latency measure being skewed.

Signed-off-by: kwall <[email protected]>
@k-wall
Copy link
Contributor Author

k-wall commented Nov 25, 2022

I am proposing #206 to resolve this defect. Earlier I spoke about refactor the canary not to use a consumer group. Whilst I think this is the right approach, as we intend to rewrite the Canary in Java, I don't think it is worth putting the time into the Go implementation. I believe #206 is a worthwhile improvement that should reduce the worst of the spurious latency measurements.

k-wall added a commit to k-wall/strimzi-canary that referenced this issue Nov 25, 2022
…e partition leadership changes

This minimises the time the canary spends consumer from followers after the partition leadership changes on the broker, and so avoid end
to end latency measure being skewed.

Signed-off-by: kwall <[email protected]>
@ppatierno
Copy link
Member

@k-wall when you described the problem, this issue I raised in the Sarama community came to my mind IBM/sarama#1927
Isn't it possible to be a regression somewhere and maybe we don't need to reduce the refresh metadata interval but looking forward to some fix in Sarama itself?

@k-wall
Copy link
Contributor Author

k-wall commented Dec 6, 2022

@ppatierno I don't think it is a Sarama client issue. When the sarama client refreshes the metadata (i mean its own timed refresh), the problem goes away immediately. That's the first time the client learns that the leadership has moved. It reacts properly to that signal IMO.

[Sarama] 2022/11/23 16:07:47.472933 consumer/broker/0 abandoned in favor of preferred replica broker/2

I think this is Kafka working as designed. There's no kafka mechanism for the client to learn of the leadership change until it refreshes right? The PreferredReadReplica mechanism is only applicable when using rack awareness, which is not the case for the canary.

@ppatierno
Copy link
Member

@k-wall but isn't the behaviour you are seeing exactly the same described in the issue I opened in the Sarama repo?
Where they are different? Because if it's the same, it was fixed and now not working again.

@k-wall
Copy link
Contributor Author

k-wall commented Dec 6, 2022

No, this is different to your report:

In IBM/sarama#1927 (comment) you said "yes you are right (of course! ;-)) even after a metadata refresh nothing is changed."

That's not the case here. The timed metadata refresh is resolving the issue - but we suffer potentially minutes of duff latency measurements until it fires. My proposed change triggers the metadata refresh early (so we don't have to wait for the next timed one and don't suffer the duff data).

k-wall added a commit to k-wall/strimzi-canary that referenced this issue Dec 7, 2022
…e partition leadership changes

This minimises the time the canary spends consumer from followers after the partition leadership changes on the broker, and so avoid end
to end latency measure being skewed.

Signed-off-by: kwall <[email protected]>
ppatierno added a commit to ppatierno/strimzi-canary that referenced this issue Dec 20, 2022
ppatierno added a commit that referenced this issue Dec 20, 2022
Signed-off-by: Paolo Patierno <[email protected]>

Signed-off-by: Paolo Patierno <[email protected]>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants