Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow the Consumer to disable auto-commit offsets #1164

Merged
merged 9 commits into from
Nov 22, 2019
Merged

Conversation

kjelle
Copy link
Contributor

@kjelle kjelle commented Sep 6, 2018

Adding configuration Consumer.Offsets.Enable (default true)
It blocks the call to flushToBroker in OffsetManager

@snigle
Copy link

snigle commented Oct 16, 2018

Hi, I also need to disable the autocommit but I need to commit when I want and at closing.
Do you have the same need ?
If yes this patch should be enough ? #1193

@kjelle
Copy link
Contributor Author

kjelle commented Feb 27, 2019

Given the new consumer groups one also needs to escape in the Close() of the offset manager as it requires MainLoop to run...
I updated my patch @snigle

@kjelle
Copy link
Contributor Author

kjelle commented Aug 24, 2019

@taiyang-li I'll merge upstream and see if I is compatible with the latest tag, 1.23.1 - if so, I'll reissue this PR.

@ghost ghost added the cla-needed label Aug 24, 2019
@kjelle
Copy link
Contributor Author

kjelle commented Aug 24, 2019

cla is signed. im unsure why only one of the travis builds fail, and not all..

@varun06
Copy link
Contributor

varun06 commented Sep 16, 2019

hey @kjelle
even though the change is simple, I feel that there should be a test for it, don't want an edge case to pop up.

is it feasible to write a test for it?

config.go Outdated Show resolved Hide resolved
@kjelle
Copy link
Contributor Author

kjelle commented Nov 15, 2019

is it feasible to write a test for it?

I am adding tests to check if setting the Consumer.Offsets.AutoCommit.Enable gives the expected results.

…mit.Enable. Renamed Consumer.Offsets.CommitInteval to Consumer.Offsets.AutoCommit.Inteval
@kjelle
Copy link
Contributor Author

kjelle commented Nov 18, 2019

@varun06 I discovered some problems while writing the unit-tests. Updates are in kjelle:master repo. Tests are good. I will run it in our production line to see how it behaves. It should not differ.

kjelle referenced this pull request in kjelle/sarama Nov 18, 2019
@varun06
Copy link
Contributor

varun06 commented Nov 19, 2019

@kjelle That's why I asked for test :)

Thanks for doing that and finding the issue.

Copy link
Contributor

@varun06 varun06 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@varun06
Copy link
Contributor

varun06 commented Nov 19, 2019

@sam-obeid can you please have a look again?

@kjelle
Copy link
Contributor Author

kjelle commented Nov 19, 2019

@varun06 given the amount of commits I would suggest a merge squash when it comes to that :)

@varun06 varun06 merged commit 72a629d into IBM:master Nov 22, 2019
@varun06
Copy link
Contributor

varun06 commented Nov 22, 2019

Done, Please test and let me know how it goes.

souravdotsaha added a commit to souravdotsaha/sarama that referenced this pull request Nov 22, 2019
@@ -338,8 +338,15 @@ type Config struct {
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
// How frequently to commit updated offsets. Defaults to 1s.
CommitInterval time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh shit, this was a breaking change, i.e: it breaks people using sarama-cluster with the latest version of sarama.

../../../../go/pkg/mod/github.com/bsm/[email protected]+incompatible/consumer.go:452:59: c.client.config.Config.Consumer.Offsets.CommitInterval undefined (type struct { AutoCommit struct { Enable bool; Interval time.Duration }; Initial int64; Retention time.Duration; Retry struct { Max int } } has no field or method CommitInterval)

func (om *offsetManager) flushToBroker() {
if !om.conf.Consumer.Offsets.AutoCommit.Enable {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad this line is fixed, it actually blocks all calls to Commit(), however sometimes we do wish to commit manually.

@kjelle
Copy link
Contributor Author

kjelle commented Oct 28, 2020

I'm sorry if this PR caused a lot of hazzle. I commit offsets manually by making my own offset manager sending a sarama.OffsetCommitRequest with my own block containing topic, partition and offset. I then send it to the correct broker coordinating that partition. If Sarama now supports manual offset commit I could move to that instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants