-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle kafka "null messages" for deleting in a compacted topic #634
Comments
I've been digging into this a bit further. There are some things:
I'm not 100% clear on what sarama's opinion on backwards compatibility with Kafka broker versions is here - getting a Thoughts? I've already written the sarama code for handling this case and tested it against a kafka broker. I'm somewhat loathe to use uncompressed topics just to workaround this issue with sarama. |
Quick note since it's the weekend: compaction is not something that we use internally, so we have put no effort into properly supporting it, but we have no objections to merging fixes or taking suggestions of that sort. Our supported versions of Kafka are documented in the README; we've had enough of these cases that I plan to introduce a configuration value so you can specify which broker version you are talking to and it will behave correctly. More tomorrow. |
Kafka uses a length of -1 for bytes and strings to distinguish Because kafka's protocol is weird about compression (e.g. the compressed payload must be a nested message set) I would not be surprised if we have some issues sending or receiving nils in those cases. Bug fixes welcome, and we're happy to err on the side of being more permissive for older versions, at least until I get around to implementing the "which kafka version" config value. |
@tcrayford : Do you mind to share your sample code for deleting message? I set ProducerMessage.Value to nil but found nothing deleted. Tried Kafka 0.9 and 0.8.1.1. |
Per https://kafka.apache.org/documentation.html#compaction messages with null payloads are valid when using log compaction. Make this work for all messages, not just uncompressed ones. Should fix #634.
According to Kafka's source, the way to "delete" messages is to send a payload with a size of
-1
. Looking at sarama, it doesn't seem like this is supported, and https://github.com/Shopify/sarama/blob/2b18ad7079cca9c46a012cb6e7043e7de2985482/async_producer.go#L151 seems to suggest that this will end up with an incorrect header being sent to Kafka. Are you interested in a change to support this? I think we'd just want an additional check in that function that doesn't change size if the value encoder's length is-1
.As another supporting addition, it seems like it would be a good idea to have a
DeleteMessageEncoder
that is just an empty struct that handles this logic.Thoughts? I'm happy doing all the coding/testing work here.
The text was updated successfully, but these errors were encountered: