-
Notifications
You must be signed in to change notification settings - Fork 749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Rate limit for Kafka and initial offset support and refactor config #829
Conversation
eventsources/sources/kafka/start.go
Outdated
//1000000000 is 1 second in nanoseconds | ||
d := (1000000000 / time.Duration(consumer.kafkaEventSource.LimitEventsPerSecond) * time.Nanosecond) * time.Nanosecond | ||
consumer.logger.Infof("Sleeping for: %v.", d) | ||
sleepContext(consumer.ctx, d) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this simply be time.Sleep(d)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the reason behind this is to just be kind to cleanup code this a comment from sarama Please note, that once a rebalance is triggered, sessions must be completed within Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout is exceeded, the consumer will be removed from the group by Kafka, which will cause offset commit failures.
By doing a sleep this way if the ctx is canceled the sleep will also get canceled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any relationship between them, or I missed anything? ctx
only gets canceled when the event source service gets a SIGTERM, however the rebalance is something happens on the kafka server side, it will not lead to a ctx cancel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it is pretty minor I suppose on an exit from SIGTERM I thought it would still be nice to give the cleanup as much time as possible granted it currently does nothing so probably moot. We also only go down to a min of 1 rps so that would be a max sleep of 100ms which really is not that long. I can move it to just a sleep.
eventsources/sources/kafka/start.go
Outdated
// Consumer represents a Sarama consumer group consumer | ||
type Consumer struct { | ||
ready chan bool | ||
dispatch func([]byte) error | ||
logger *zap.SugaredLogger | ||
kafkaEventSource *v1alpha1.KafkaEventSource | ||
ctx context.Context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we don't need ctx
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I would like @VaibhavPage to take a further look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!!
…nfig (#829) * add a rate limit to kafka event source * refactor sarama config and move kafka version up a level * fix most linter issues * fix last lint issue by bypassing * update example * make requested changes * update kafka example for new version * add +optional back in after removal of comments * add more optional based on settings that can be defaulted * remove context based cancelable sleep * remove unused context
…nfig (#829) * add a rate limit to kafka event source * refactor sarama config and move kafka version up a level * fix most linter issues * fix last lint issue by bypassing * update example * make requested changes * update kafka example for new version * add +optional back in after removal of comments * add more optional based on settings that can be defaulted * remove context based cancelable sleep * remove unused context
…nfig (argoproj#829) * add a rate limit to kafka event source * refactor sarama config and move kafka version up a level * fix most linter issues * fix last lint issue by bypassing * update example * make requested changes * update kafka example for new version * add +optional back in after removal of comments * add more optional based on settings that can be defaulted * remove context based cancelable sleep * remove unused context
This adds a simple rate limit to kafka event creation. This also refactor and cleans up a bit of the code to setup kafka config for sarama. I also moved the kafka version variable out of consumer groups it actually needs to be set in order to pull
msg.Timestamp.String()
Can see issue here