Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Wait for messages to be processed before committing during shutdown #37

Merged
merged 2 commits into from
Mar 9, 2015

Conversation

wvanbergen
Copy link
Owner

Previously we would immediately try to commit when finalizing a partition. Now, we wait until we have seen all the offsets we have read from Kafka, up to a given timeout.

@bayan any chance you could try this branch and see if you run into the problem?

This fixes #35. @eapache

@wvanbergen wvanbergen force-pushed the fix_offset_committing branch from 08bc5b7 to d929679 Compare March 8, 2015 03:52
@eapache
Copy link
Collaborator

eapache commented Mar 8, 2015

LGTM, but that's not a super-strong endorsement.

@wvanbergen wvanbergen force-pushed the fix_offset_committing branch from d929679 to 83025b2 Compare March 8, 2015 12:19
@wvanbergen wvanbergen force-pushed the fix_offset_committing branch from 83025b2 to 0829a58 Compare March 8, 2015 12:21
@wvanbergen
Copy link
Owner Author

@bayan You can specify a processing timeout:

config := consumergroup.NewConfig()
config.ChannelBufferSize = 256 // default
config.Offsets.ProcessingTimeout = 60 * time.Second  // default
cg, err := consumergroup.JoinConsumergroup(..., config)
...

If your message processing usually takes long, you have to adjust the 60 second default accordingly. With a channel buffer size of 256 and an average message processing time on 100ms, your ProcessingTimeout should be 25.6 seconds or higher.

wvanbergen added a commit that referenced this pull request Mar 9, 2015
Wait for messages to be processed before committing during shutdown
@wvanbergen wvanbergen merged commit 15a566f into master Mar 9, 2015
@wvanbergen wvanbergen deleted the fix_offset_committing branch March 9, 2015 20:12
@bayan
Copy link

bayan commented Mar 10, 2015

Thank you and sorry about the delay.

I originally had some problems where it appeared as though starting one consumer would halt the others, as well as some other weird behaviour.

So I rewrote a simple producer/consumer example today to try to isolate the issue, and it appears to work fine.

@wvanbergen Can you please confirm your formula for setting the ProcessingTimeout?
By your example it looks like: ChannelBufferSize * AverageProcessingTimeInSeconds

The messages I receive are batch file paths, so it takes anywhere from a few seconds and up to several minutes to process each one. I have been setting the ChannelBufferSize to 1 or even 0.

@wvanbergen
Copy link
Owner Author

Yup, more precisely it's (ChannelBufferSize + 1) * AverageProcessingTimeInSeconds.

In your case I'd sat ChannelBufferSize (you don't have much performance to gain by buffering), and set the timeout high enough to be able to finish the message that is currently processing, e.g. 10 minutes.

@bayan
Copy link

bayan commented Mar 10, 2015

Thanks - that's exactly what I did!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Null pointer exception when starting another instance for the same consumer group
4 participants