Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4514: Add Codec for ZStandard Compression #2267

Merged
merged 22 commits into from
Oct 10, 2018

Conversation

dongjinleekr
Copy link
Contributor

Hello. This PR resolves KAFKA-4514: Add Codec for ZStandard Compression. Please have a look when you are free. Since I am a total newbie of Apache Kafka, feel free to point out the deficiencies.

Add to the feature itself, I have a question: Should we support an option for ZStandard compression level?

According to ZStandard official documentation, it supports compression level of 1 ~ 22. Because of that, Hadoop added a new configuration option named "io.compression.codec.zstd.level", whose default value is 3. In this PR, I configured the compression level to 1 as a temporary one but wondering following problems:

  • Should we provide a configurable option?
  • Would it better to change the default value, from 1 to another one?

I am looking forward to your advice. Thanks.

@asfbot

This comment has been minimized.

@@ -69,6 +69,14 @@ public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
}
});

private static MemoizingConstructorSupplier zStd4OutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no context on this patch but I'm guessing you have a typo:
currently: zStd4OutputStreamSupplier
probably intended: zStdOutputStreamSupplier

You have a 4 in the variable name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh my. Got it.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@tgravescs
Copy link

2 big questions here.

  1. whether this needs to go through the KIP process. @ijuma mentioned that in the jira
  2. Whether we want to pull from luben/std-jni, which is someones private repo? It is bsd licensed but its not in apache so any bug fixes and such they would have to pull in and question about maintenance and such. Since BSD licensed probably could just copy or could also copy the Hadoop implementation ones. Anyway this is more question for the Kafka committers.

@tgravescs
Copy link

Also, thanks for working on this. Did you have a chance to run any performance comparisons?

@ijuma
Copy link
Member

ijuma commented Dec 16, 2016

Thanks for the PR.

To answer @tgravescs's questions:

  1. Generally we require KIPs for changes that affect the message format because we want such changes to be vetted thoroughly. In this case, we are using an additional bit in the attributes byte and we don't have many of those left, so good to get feedback from a wider group.

  2. That is a potential concern. We would be more likely to include it if we could use an established library (I understand that this is a new compression algorithm so it may be hard to find).

Performance numbers would definitely help the discussion.

@dongjinleekr
Copy link
Contributor Author

@tgravescs @ijuma

Thanks for your advice. Let me summarize:

I will conduct a test benchmark a couple of days, and submit a proposal on KIP with the results. From there, let's discuss the following topics there:

  1. Whether we extend current message format to support ZStandard.
  2. Whether we use the (already written) ZStandard library or write some JNI code directly on Kafka.
  3. Whether we provide a Default compression level and what value it should be.

I just started configuring benchmark environment on AWS. Please tune in!

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@asfbot

This comment has been minimized.

@hachikuji hachikuji merged commit 741cb76 into apache:trunk Oct 10, 2018
hachikuji pushed a commit that referenced this pull request Oct 10, 2018
This patch adds support for zstandard compression to Kafka as documented in KIP-110: https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression. 

Reviewers: Ivan Babrou <[email protected]>, Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>
@hachikuji
Copy link

Merged to trunk and 2.1. Thanks again for your persistence! Great contribution!

@ijuma
Copy link
Member

ijuma commented Oct 10, 2018

Thanks for your contribution @dongjinleekr! Is https://twitter.com/dongjinleekr, your Twitter handle?

hachikuji pushed a commit that referenced this pull request Oct 10, 2018
PR #2267 Introduced support for Zstandard compression. The relevant test expects values for `num_nodes` and `num_producers` based on the (now-incremented) count of compression types.

Passed the affected, previously-failing test:
`ducker-ak test tests/kafkatest/tests/client/compression_test.py`

Reviewers: Jason Gustafson <[email protected]>
hachikuji pushed a commit that referenced this pull request Oct 10, 2018
PR #2267 Introduced support for Zstandard compression. The relevant test expects values for `num_nodes` and `num_producers` based on the (now-incremented) count of compression types.

Passed the affected, previously-failing test:
`ducker-ak test tests/kafkatest/tests/client/compression_test.py`

Reviewers: Jason Gustafson <[email protected]>
@davewat
Copy link

davewat commented Nov 13, 2018

Was a param introduced to adjust the compression level?

@scottcarey
Copy link

scottcarey commented Nov 13, 2018 via email

@davewat
Copy link

davewat commented Nov 14, 2018

@dongjinleekr where is the compression level specified? I can't seem to locate it, even in source. Right now I am getting better compression with gzip still, and want to adjust the level. Thanks.

@scottcarey
Copy link

scottcarey commented Nov 14, 2018 via email

@eliaslevy
Copy link
Contributor

@scottcarey while it is an anti-pattern, the broker can compress/recompress messages depending on the the topic's compression.type configuration, which could be set to zstd .

@ijuma
Copy link
Member

ijuma commented Nov 14, 2018

Producer configs have to be in the KIP (any config is considered public API). I don't think there's a way to configure the compression level at this point. If someone wants to contribute that, it would make to allow it for other compression types too.

@scottcarey
Copy link

scottcarey commented Nov 14, 2018 via email

@scottcarey
Copy link

scottcarey commented Nov 14, 2018 via email

@ijuma
Copy link
Member

ijuma commented Nov 14, 2018

Setting the topic config to zstd while not compressing in the producer is allowed for all compression algorithms.

@scottcarey
Copy link

scottcarey commented Nov 14, 2018 via email

@davewat
Copy link

davewat commented Nov 14, 2018

@scottcarey I wouldn't say it's irrelevant on the broker. We have older producers that don't/ won't speak zstd, but our (testing) Brokers force zstd on the topic for storage/ space requirements. The broker's ability to set the compression level is key in our use case.

I just can't figure out where in this PR the level is set - though now I am thinking we are just utilizing the default (3).

@dongjinleekr
Copy link
Contributor Author

@davewat @scottcarey @ijuma Sorry for the late reply. In fact, I already investigated this issue (support compression level for zstd) but I concluded that it would be much better to put this issue into separated one and focus on implementing the ZSTD feature only.

Here is why: All compression codecs (i.e., Gzip, Snappy, LZ4 and ZSTD) support some parameters to change the degree of compression; However, only LZ4 and ZSTD supports the concept of 'level' - in the case of GZIP and Snappy, they require the block size parameter, not 'level'.

To make the compression level feature available, we must modify the API signatures of MemoryRecordsBuilder to support compression level, and add some validation logic1. It requires additional modifications to read ProducerConfig value and pass it into MemoryRecordsBuilder. Of course, this work requires a bunch of modifications and some policies on various codecs. It is why I decided to put off this issue and use the default level for ZSTD, that is, 3.

How about your opinion? Do you really need it? Does it sound reasonable?

If the answer is Yes, please file it to Jira with 'needs-kip' tag; Then, I will take the issue - I will make the proposal.

Footnotes

  1. Check whether given CompressionCodec supports the concept of 'level,' and whether given compression level is valid for the CompressionCodec. (e.g., ZSTD supports 22 levels but LZ4 supports 4 levels only.)

@luben
Copy link

luben commented Nov 15, 2018

@dongjinleekr , zstd also supports negative levels for faster compression, it's equivalent of the --fast X in the CLI

@davewat
Copy link

davewat commented Nov 15, 2018

@dongjinleekr our use case needs it, not sure if we are just a corner case. I have created the issue in Jira. Thanks!

https://issues.apache.org/jira/browse/KAFKA-7632

@scottcarey
Copy link

scottcarey commented Nov 15, 2018 via email

@bobrik
Copy link
Contributor

bobrik commented Nov 15, 2018

When I added zstd support for Go library, I also added compression level support:

It applies to zstd and gzip.

@dongjinleekr
Copy link
Contributor Author

@scottcarey @luben Thank you for the correction. Right, ZSTD now supports negative compression level (#1 #2) and GZIP is also able to use compression levels, although it is blocked in the official API - but we can make use of it with some workaround. These features should be supported in the implementation.

@davewat Thank you for filing the issue. I just updated the issue applying the comments here and now working on the KIP. I will give you slack when I complete the document and open the discussion thread.

@bobrik Great. Sarama always guides us the direction! I will include sarama's case in the KIP.

@dongjinleekr
Copy link
Contributor Author

@davewat @scottcarey @eliaslevy @ijuma I just opened the discussion thread in dev mailing list. Let's continue the discussion there.

cc/ @bobrik @luben

@luben
Copy link

luben commented Nov 19, 2018

@dongeforever , just released zstd-jni-1.3.7-2 with support to query min/max compression levels

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
This patch adds support for zstandard compression to Kafka as documented in KIP-110: https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression. 

Reviewers: Ivan Babrou <[email protected]>, Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
PR apache#2267 Introduced support for Zstandard compression. The relevant test expects values for `num_nodes` and `num_producers` based on the (now-incremented) count of compression types.

Passed the affected, previously-failing test:
`ducker-ak test tests/kafkatest/tests/client/compression_test.py`

Reviewers: Jason Gustafson <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.