-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fixed ConsumerGroup flooding logs with client/metadata update req #1544 #1578
Conversation
@antsbean I think this PR looks good for fixing the problem described in #1544, particularly as you no-longer call RefreshMetadata in Ideally I think it would make sense rather than having this polling background thread on a Ticker if we had the client expose a slice of channels that we could AddMetadataSubscriber/RemoveMetadataSubscriber on, to receive a broadcast notification whenever the client's |
…artitionsNumber Coroutine
@dnwe your think is good, but After reviewing the relevant code of kakfa consumer, I think that the consumer group will not call RefreshMata after receiving the message normally from kafka server, it will only call it when please see code
then
|
I'm not sure I understand what you mean? The code you link to is fine, that's just the consumer forcing a refresh of metadata for specific topics when a partitionConsumer received an error (e.g., if leadership has changed or whatever) — it doesn't need to refresh the metadata whilst it is receiving messages successfully. The |
@dnwe I mean |
@antsbean I still don't understand, but that's ok 😀 FYI your branch needs rebasing
|
@d1egoaz can you review and merge this one? Whilst it may not 100% be the final solution we'd want, it does fix #1544 which is in the current release-tagged version so it'd be good to push it through We should probably merge your zstd PR and bundle up any other fixes from the PR's queue and then cut another release version really |
if this is fixing #1544 we should merge it now. But we should loop back to what @dnwe suggested here: |
thanks for the contribution @antsbean |
if it only calls I wonder if the original intention of #1525 was lost here, and this doesn't make any sense now. |
@d1egoaz the client is already refreshing its metadata in the background goroutine so topicToPartitionNumbers will get the last known state from that refresh when called by the ticker. If the number of partitions for any topic differs to the one stored in the ticker loop then that returns out the for loop, calling session.cancel() which will force the consumer to leave and rejoin the group |
so, https://github.com/Shopify/sarama/blob/221ed1a25150fa98d750f2e2f33f58ff018238b4/client.go#L901 Thanks @dnwe that makes sense now! |
Yep I think so. So we should detect new partitions (if a topic is scaled) in less than |
I have a question,if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil { |
reduce refresh metadata frequency to avoid flood logs