Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offsets are reset on Azure EventHubs when a consumer starts #2677

Closed
mfamador opened this issue Oct 18, 2023 · 7 comments · Fixed by #2700
Closed

Offsets are reset on Azure EventHubs when a consumer starts #2677

mfamador opened this issue Oct 18, 2023 · 7 comments · Fixed by #2700
Assignees
Labels
needs-investigation Issues that require followup from maintainers

Comments

@mfamador
Copy link

mfamador commented Oct 18, 2023

Description

Starting from version 1.41.0, every time a consumer group starts or leaves, all running consumers from the same consumer group consume all messages all over again. Seems like the offsets are being reset and aren't commited again.
I couldn't reproduce the issue on a regular kafka, this only occurs apparently using Azure Event Hubs, so kafka version 1.0.0.
Reverting back to 1.40.1 everything works as expected.

Versions
Sarama Kafka Go
1.41.0,1.41.1, 1.41.2, 1.41.3 1.0.0 1.21.3
Configuration
config := sarama.NewConfig()
config.Net.TLS.Enable = true
config.Net.SASL.Enable = true
config.Net.SASL.User = "$ConnectionString"
config.Net.SASL.Password = connectionString
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = time.Second
config.Version = sarama.V1_0_0_0
brokerAddresses := []string{brokerAddress}
consumerGroup := "foo"
consumer, err := sarama.NewConsumerGroup(brokerAddresses, consumerGroup, config)
Logs
logs: CLICK ME


Message received: key=, value={"a":"35fb0d14-a2d8-43a6-b5ad-2bd29dfd4741"}, parition=0, offset=0
Message received: key=, value={"a":"ce7ed018-70d3-43fb-9fed-e5fa45e93228"}, parition=0, offset=1
Message received: key=, value={"a":"3d9ccaff-1011-4ff8-bb02-46490f626c13"}, parition=0, offset=2
Message received: key=, value={"a":"7767f7a4-fbfa-43d5-ab0b-05f399095361"}, parition=0, offset=3
Message received: key=, value={"a":"a9540bf2-0be2-406b-8fba-a6c1fd3f6185"}, parition=0, offset=4
Message received: key=, value={"a":"edc0f104-31a1-4ee7-b82a-6e6fc982585a"}, parition=0, offset=5
Message received: key=, value={"a":"150fb310-28ff-42a7-8e63-bedd00419882"}, parition=0, offset=6
Message received: key=, value={"a":"a5264f4f-0ec1-4d53-a2ae-8371d0668af1"}, parition=0, offset=7
Message received: key=, value={"a":"084179c8-a519-4e3d-933b-e3095d22158c"}, parition=0, offset=8
Message received: key=, value={"a":"75d47dc6-768c-4775-8b0d-04ad8f6de55a"}, parition=0, offset=9
Message received: key=, value={"a":"de31cfed-5f03-4c56-9810-4c905f93a343"}, parition=0, offset=10
Message received: key=, value={"a":"0afc2358-983a-4559-9edd-5ae295484611"}, parition=0, offset=11
Message received: key=, value={"a":"7ee8c3b4-a6dc-4fdb-a24b-0b70f525a5ee"}, parition=0, offset=12
Message received: key=, value={"a":"b9242862-4741-40c7-8486-f3f436f65883"}, parition=0, offset=13
. . .

restarting the consumer or adding a new one it will consume everything again:


Message received: key=, value={"a":"35fb0d14-a2d8-43a6-b5ad-2bd29dfd4741"}, parition=0, offset=0
Message received: key=, value={"a":"ce7ed018-70d3-43fb-9fed-e5fa45e93228"}, parition=0, offset=1
Message received: key=, value={"a":"3d9ccaff-1011-4ff8-bb02-46490f626c13"}, parition=0, offset=2
Message received: key=, value={"a":"7767f7a4-fbfa-43d5-ab0b-05f399095361"}, parition=0, offset=3
Message received: key=, value={"a":"a9540bf2-0be2-406b-8fba-a6c1fd3f6185"}, parition=0, offset=4
Message received: key=, value={"a":"edc0f104-31a1-4ee7-b82a-6e6fc982585a"}, parition=0, offset=5
Message received: key=, value={"a":"150fb310-28ff-42a7-8e63-bedd00419882"}, parition=0, offset=6
Message received: key=, value={"a":"a5264f4f-0ec1-4d53-a2ae-8371d0668af1"}, parition=0, offset=7
Message received: key=, value={"a":"084179c8-a519-4e3d-933b-e3095d22158c"}, parition=0, offset=8
Message received: key=, value={"a":"75d47dc6-768c-4775-8b0d-04ad8f6de55a"}, parition=0, offset=9
Message received: key=, value={"a":"de31cfed-5f03-4c56-9810-4c905f93a343"}, parition=0, offset=10
Message received: key=, value={"a":"0afc2358-983a-4559-9edd-5ae295484611"}, parition=0, offset=11
Message received: key=, value={"a":"7ee8c3b4-a6dc-4fdb-a24b-0b70f525a5ee"}, parition=0, offset=12
Message received: key=, value={"a":"b9242862-4741-40c7-8486-f3f436f65883"}, parition=0, offset=13
. . . 

Additional Context

Not sure if it's related, but on 1.40.1, if we try to use version 2.0.0 on an Event Hub we can not even connect, we need to specify 1.0.0.
Starting from 1.41.0, though, we can set 1.0.0 or 2.0.0 and we're able to connect the same, with the referred problem of reseting the offsets, of course.

@mfamador
Copy link
Author

mfamador commented Oct 18, 2023

Only a few times, not as often as on Even Hubs whatsoever , it can be reproduced (perhaps a race condition) with docker using kafka 1.0.0 when having multiple partitions:

Configuration

topic := "test"
consumerGroup := "bar"
config.Net.TLS.Enable = false
config.Net.SASL.Enable = false
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = time.Second
config.Version = sarama.V1_0_0_0
brokerAddresses := []string{"localhost:9092"}
docker network create kafka-net
docker run -d --name zookeeper --network kafka-net -p 2181:2181 wurstmeister/zookeeper
docker run -d --name kafka --network kafka-net -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_HOST_NAME=localhost -e KAFKA_ADVERTISED_PORT=9092 -p 9092:9092 wurstmeister/kafka:1.0.0

Create topic with multiple partitions:

docker exec -it kafka bash
> kafka-topics.sh --create  --zookeeper zookeeper --replication-factor 1 --partitions 3  --topic test

Publishing a few messages and adding and removing consumers, sometimes, it also resets the offsets and reads all messages again, but not as often as Event Hubs.

@mfamador
Copy link
Author

To reproduce it with docker with kafka 1.0.0 there's more chances if we start 3 consumers, publish a few messages, wait around 10 min and stop one of the consumers. Most certainly the offsets will be reset and the other consumers will consume all messages again.

@prestona
Copy link
Member

prestona commented Nov 1, 2023

I've had a dig through the commits between 1.40.1 and 1.41.0. There's no obvious changes to the consumer group logic (or at least I didn't spot any). There are however quite a few changes to support later versions of individual Kafka protocol flows.

It looks like #2555 means that setting version sarama.V1_0_0_0 will now send a version 2 offset commit request, where as prior to this change a version 1 request would have been sent. The difference between these two request versions is the introduction of a retention time field. This value used as the retention time is configurable via Config.Consumer.Offsets.Retention with the default being zero.

Looking at the 1.0.x Kafka client code, it uses a default of -1 if no value is specified:
https://github.com/apache/kafka/blob/c0518aa65f25317eca0c1da4d350f549d35b8536/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java#L117. This suggests that perhaps the broker may interpret a value of 0 as being "don't retain this offset" (or perhaps only retain it for a short period of time). I also note that the retention time field was dropped in version 5 of the protocol - which I think corresponds to Kafka 2.1.0.

@mfamador - if its not inconvenient, I'd be interested to know whether setting the value of Config.Consumer.Offsets.Retention to a more reasonable duration stops the problem from occurring?

@mfamador
Copy link
Author

mfamador commented Nov 1, 2023

@prestona, I guess that's it, setting config.Consumer.Offsets.Retention to 0 or -1 keeps the problem but setting it to something like 200*time.Hour it stops reseting the offset.

@prestona
Copy link
Member

prestona commented Nov 1, 2023

Thanks for the confirmation @mfamador! I'll look at coding up a fix.

@prestona
Copy link
Member

prestona commented Nov 2, 2023

Re-produced using Kafka v1.1.1, and the consumergroups Sarama example:

go run main.go -brokers localhost:9092 -topics testtopic -group testgroup -version 1.1.1

It appears that the broker cleans up expired offsets asynchronously, so I have to wait 5-10 minutes for the offsets to be expired.

The broker code treats a retention time of -1 as meaning "use the default" here.

prestona added a commit to prestona/sarama that referenced this issue Nov 2, 2023
The retention time field of the offset commit request uses -1 to mean
"use the broker's default". Sarama uses the value 0 in the
`Config.Consumer.Offsets.Retenton` field to mean "use the broker's
default". Ensure that Sarama's default (0) is correctly mapped to the
broker's default (-1).

Fixes: IBM#2677
Signed-off-by: Adrian Preston <[email protected]>
@mfamador
Copy link
Author

mfamador commented Nov 2, 2023

To confirm that I've tested your fix with Azure EvenHubs and it works perfectly!

@dnwe dnwe closed this as completed in #2700 Nov 2, 2023
dnwe pushed a commit that referenced this issue Nov 2, 2023
The retention time field of the offset commit request uses -1 to mean
"use the broker's default". Sarama uses the value 0 in the
`Config.Consumer.Offsets.Retenton` field to mean "use the broker's
default". Ensure that Sarama's default (0) is correctly mapped to the
broker's default (-1).

Fixes: #2677

Signed-off-by: Adrian Preston <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-investigation Issues that require followup from maintainers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants