-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add option to propagate OffsetOutOfRange error #2252
feat: add option to propagate OffsetOutOfRange error #2252
Conversation
recreation of #1183 because it is still actual and needed. Nobody was against this change, but it ended up in a stalled state and closed. |
@dkolistratova hmm, whilst it may or may not be desirable, it seems like we match the behaviour of the Java kafka-client at the moment? Isn't this PR adding functionality similar to unmerged apache/kafka#9006 that came out of KAFKA-3370? |
@dnwe we do not set offset to the nearest value, just add a configurable option to propagate the error to the calling code. |
When consuming a partition using a consumer group, the code handles ErrOffsetOutOfRange errors by resetting to the "initial" position, as specified by user (i.e. either oldest or newest available offset). This, however, can be very dangerous. Say a consumer has consumed up to offset 100 on replica A but replica B has only replicated up to offset 99 due to temporary under-replication. During a rebalance, sarama can end up with an offset out-of-range error if it fetches partition metadata from replica B since the desired offset of 100 is greater than the newest offset of 99. The sarama consumer would reset the offset in this case, which can cause reprocessing of old data, especially if the initial offset is configured as "oldest". This commit adds a config flag to disable this automatic reset. In the above case, the consumer will be able to proceed normally after the data replicates.
0340fb6
to
9af427a
Compare
@dnwe we've coped with the CLA, can it be reviewed/discussed? |
@dnwe Looks like unmerged apache/kafka#9006 will reset offsets to nearest value. But it also might be not what we need. What is this PR is adding is more similar to OffsetResetStrategy.None |
@@ -499,6 +508,7 @@ func NewConfig() *Config { | |||
c.Consumer.Group.Rebalance.Timeout = 60 * time.Second | |||
c.Consumer.Group.Rebalance.Retry.Max = 4 | |||
c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second | |||
c.Consumer.Group.ResetInvalidOffsets = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment says "default to true" but it's not reflected here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This parameter is so dangerous. It should be true by default.
If this parameter is false, the resetting of offsets is invalid, which can cause an infinite loop of closing and adding subscriptions.
A number of people have reported issues with the change in default behaviour that was introduced in #2252 and the comment for this configuration value had suggested it would retain the behaviour of previous Sarama versions. This PR puts the default behaviour back to resetting to `c.Consumer.Offsets.Initial` when the server returns an out of range error on a Fetch. Contributes-to: #2342
…offset-out-of-range-error feat: add option to propagate OffsetOutOfRange error
When consuming a partition using a consumer group, the code
handles ErrOffsetOutOfRange errors by resetting to the "initial"
position, as specified by user (i.e. either oldest or newest available
offset). This, however, can be very dangerous. Say a consumer has
consumed up to offset 100 on replica A but replica B has only
replicated up to offset 99 due to temporary under-replication. During
a rebalance, sarama can end up with an offset out-of-range error if it
fetches partition metadata from replica B since the desired offset of
100 is greater than the newest offset of 99. The sarama consumer would
reset the offset in this case, which can cause reprocessing of old
data, especially if the initial offset is configured as "oldest".
This commit adds a config flag to disable this automatic reset. In the
above case, the consumer will be able to proceed normally after the
data replicates.