Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4514: Add Codec for ZStandard Compression #2267

Merged
merged 22 commits into from
Oct 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d4b2bf4
KAFKA-4514: Add Codec for ZStandard Compression
dongjinleekr Dec 16, 2016
bda05f7
Minor updates
dongjinleekr Mar 10, 2018
82ba81a
Update ZStd version: 1.3.3 -> 1.3.4 (zstd-jni 1.3.4-10)
dongjinleekr Jun 13, 2018
51bdf15
Update ZStd version: 1.3.4 -> 1.3.5 (zstd-jni 1.3.5-2)
dongjinleekr Jul 14, 2018
35dcffa
Add license description of zstd and zstd-jni
dongjinleekr Aug 12, 2018
8872bdf
Disallow fetch request with API version < 10 for ZStandard compressed…
dongjinleekr Aug 31, 2018
fbdcabe
Disallow produce request with API version < 7 for ZStandard compresse…
dongjinleekr Sep 16, 2018
74d6b19
Disallow MemoryRecordsBuilder instantiation with ZSTD compression cod…
dongjinleekr Sep 21, 2018
671e12f
Disallow downconversion of ZSTD compressed records
dongjinleekr Sep 25, 2018
c6bbabd
Make the constructor of ProduceRequest.Builder public: 1. parity with…
dongjinleekr Sep 26, 2018
e5a7fd4
Add integration tests: FetchRequestTest, ProduceRequestTest
dongjinleekr Sep 26, 2018
06b2a59
Update zstd-jni version: 1.3.5-2 -> 1.3.5-4
dongjinleekr Sep 27, 2018
561d21c
Fix: remove unneeded newline addition at CompressionType.java
dongjinleekr Sep 28, 2018
3b4f6ae
Revert "Disallow downconversion of ZSTD compressed records"
dongjinleekr Oct 3, 2018
607defc
Reimplement down-conversion logic
dongjinleekr Oct 3, 2018
f34d323
Update for the previous commit (Thanks to Jason Gustafson's guide)
dongjinleekr Oct 4, 2018
1eee287
Add UNSUPPORTED_COMPRESSION_TYPE to the possible error codes list of …
dongjinleekr Oct 5, 2018
bf95957
Refactor tests: MemoryRecordsTest, MemoryRecordsBuilderTest
dongjinleekr Oct 5, 2018
36d77af
Remove ProduceRequest version check in KafkaApis#handleProduceRequest…
dongjinleekr Oct 5, 2018
54eaa61
Add validation for the case magic < 2 with Zstandard Compression
dongjinleekr Oct 5, 2018
65c57a1
Consistency: all log or exception messages are using 'ZStandard', not…
dongjinleekr Oct 5, 2018
147c473
Add validation for ProduceRequest: update ProduceRequest.validateRecords
dongjinleekr Oct 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
See the License for the specific language governing permissions and
limitations under the License.

------------------------------------------------------------------------------------
This distribution has a binary dependency on jersey, which is available under the CDDL
License as described below.

Expand Down Expand Up @@ -328,3 +329,68 @@ As between Initial Developer and the Contributors, each party is responsible for
NOTICE PURSUANT TO SECTION 9 OF THE COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL)

The code released under the CDDL shall be governed by the laws of the State of California (excluding conflict-of-law provisions). Any litigation relating to this License shall be subject to the jurisdiction of the Federal Courts of the Northern District of California and the state courts of the State of California, with venue lying in Santa Clara County, California.

------------------------------------------------------------------------------------
This distribution has a binary dependency on zstd, which is available under the BSD 3-Clause License as described below.

BSD License

For Zstandard software

Copyright (c) 2016-present, Facebook, Inc. All rights reserved.

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

* Neither the name Facebook nor the names of its contributors may be used to
endorse or promote products derived from this software without specific
prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

------------------------------------------------------------------------------------
This distribution has a binary dependency on zstd-jni, which is available under the BSD 2-Clause License
as described below.

Zstd-jni: JNI bindings to Zstd Library

Copyright (c) 2015-2016, Luben Karavelov/ All rights reserved.

BSD License

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this LICENSE change needed? Cc @ewencp

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why most of the additions here were made, tbh.

All of this with the caveat IANAL and any questions re: licensing are best directed at ASF legal.

In many cases, the NOTICE file might be a more appropriate location depending on where the LICENSE actually gets pulled into the binary distribution. This gets confusing because of handling both source and binary distributions. In source distributions, including additional info in the LICENSE file would really only apply to anything copied in afaik, since the source distribution doesn't include any of the dependency binaries, i.e. what's under license in that distribution is only our code. (Probably gets messier with top-of-file copyright notices..., and even NOTICE files seem messy because, e.g., Apache explicitly calls them out while other licenses don't)

In binary distributions, adding here wouldn't change, for example, whether we reproduce the BSD license notice as requested -- either you think it needs to be directly printed (which this doesn't do) or just including the original JARs with their original licenses would be enough since any binary distribution that includes the dependencies would have that in the jar (or if they were missing, upstream should fix that). At a bare minimum, we don't currently include nearly all the dependency licenses here and I don't think most Apache projects do. I haven't looked back at the history to understand what triggered people to start including more than the Apache license in this file.

Again, IANAL, but it's actually unclear you really need anything beyond the raw license given the way source distributions and java binary distributions would work & preserve other contents (modulo uberjar conflicts, which we don't do). I think Apache is the only OSS license where there's a "readable" phrase that might complicate things.

But IANAL, so probably figuring this out w/ apache legal would give us more confidence in the answer.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma @ewencp The spark folks appear to have taken a principled approach here: apache/spark#21640. Ultimately they still include the zstd and zstd-jni license files. I think we can probably commit this as is and perhaps file a JIRA to come up with a standardized approach (probably just use the same convention as spark). Does that sound reasonable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: In fact, apache/spark#21640 (what @hachikuji pointed out) is what I referred to while I was working on this commit. (It is also mentioned in the KIP document.) It was initiated from this message in spark-dev mailing list, sent by a PMC member.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji I'm fine with merging and submitting a JIRA. It's clear that we're not consistent, so we probably should clean it up before the release.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I will merge today.

1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ project(':clients') {
conf2ScopeMappings.addMapping(1000, configurations.jacksonDatabindConfig, "provided")

dependencies {
compile libs.zstd
compile libs.lz4
compile libs.snappy
compile libs.slf4jApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public class ProducerConfig extends AbstractConfig {
/** <code>compression.type</code> */
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid "
+ " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, or <code>lz4</code>. "
+ " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>zstd</code>. "
dongjinleekr marked this conversation as resolved.
Show resolved Hide resolved
+ "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";

/** <code>metrics.sample.window.ms</code> */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public class TopicConfig {

public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " +
"This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally " +
"This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally " +
dongjinleekr marked this conversation as resolved.
Show resolved Hide resolved
"accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " +
"original compression codec set by the producer.";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* The requesting client does not support the compression type of given partition.
*/
public class UnsupportedCompressionTypeException extends ApiException {

private static final long serialVersionUID = 1L;

public UnsupportedCompressionTypeException(String message) {
super(message);
}

public UnsupportedCompressionTypeException(String message, Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedByAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -284,7 +285,9 @@ public enum Errors {
FENCED_LEADER_EPOCH(74, "The leader epoch in the request is older than the epoch on the broker",
FencedLeaderEpochException::new),
UNKNOWN_LEADER_EPOCH(75, "The leader epoch in the request is newer than the epoch on the broker",
UnknownLeaderEpochException::new);
UnknownLeaderEpochException::new),
UNSUPPORTED_COMPRESSION_TYPE(76, "The requesting client does not support the compression type of given partition.",
UnsupportedCompressionTypeException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry,
throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic);

CompressionType compressionType = wrapperRecord.compressionType();
if (compressionType == CompressionType.ZSTD)
throw new InvalidRecordException("Invalid wrapper compressionType found in legacy deep record iterator " + wrapperMagic);
ByteBuffer wrapperValue = wrapperRecord.value();
if (wrapperValue == null)
throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf
throw new KafkaException(e);
}
}
},

ZSTD(4, "zstd", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer);
} catch (Throwable e) {
throw new KafkaException(e);
}
}

@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
return (InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
} catch (Throwable e) {
throw new KafkaException(e);
}
}
};

public final int id;
Expand Down Expand Up @@ -156,6 +176,8 @@ public static CompressionType forId(int id) {
return SNAPPY;
case 3:
return LZ4;
case 4:
return ZSTD;
default:
throw new IllegalArgumentException("Unknown compression type id: " + id);
}
Expand All @@ -170,14 +192,16 @@ else if (SNAPPY.name.equals(name))
return SNAPPY;
else if (LZ4.name.equals(name))
return LZ4;
else if (ZSTD.name.equals(name))
return ZSTD;
else
throw new IllegalArgumentException("Unknown compression name: " + name);
}

// We should only have a runtime dependency on compression algorithms in case the native libraries don't support
// some platforms.
//
// For Snappy, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
// For Snappy and Zstd, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
// they're only loaded if used.
//
// For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger
Expand All @@ -190,6 +214,13 @@ private static class SnappyConstructors {
MethodType.methodType(void.class, OutputStream.class));
}

private static class ZstdConstructors {
static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStream",
MethodType.methodType(void.class, InputStream.class));
static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStream",
MethodType.methodType(void.class, OutputStream.class));
}

private static MethodHandle findConstructor(String className, MethodType methodType) {
try {
return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class LazyDownConversionRecords implements BaseRecords {
* @param firstOffset The starting offset for down-converted records. This only impacts some cases. See
* {@link RecordsUtil#downConvert(Iterable, byte, long, Time)} for an explanation.
* @param time The time instance to use
*
* @throws org.apache.kafka.common.errors.UnsupportedCompressionTypeException If the first batch to down-convert
* has a compression type which we do not support down-conversion for.
*/
public LazyDownConversionRecords(TopicPartition topicPartition, Records records, byte toMagic, long firstOffset, Time time) {
this.topicPartition = Objects.requireNonNull(topicPartition);
Expand Down Expand Up @@ -150,7 +153,7 @@ protected ConvertedRecords makeNext() {
}

while (batchIterator.hasNext()) {
List<RecordBatch> batches = new ArrayList<>();
final List<RecordBatch> batches = new ArrayList<>();
boolean isFirstBatch = true;
long sizeSoFar = 0;

Expand All @@ -162,6 +165,7 @@ protected ConvertedRecords makeNext() {
sizeSoFar += currentBatch.sizeInBytes();
isFirstBatch = false;
}

ConvertedRecords convertedRecords = RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
// During conversion, it is possible that we drop certain batches because they do not have an equivalent
// representation in the message format we want to convert to. For example, V0 and V1 message formats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.record;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -45,35 +46,50 @@ public LazyDownConversionRecordsSend(String destination, LazyDownConversionRecor
convertedRecordsIterator = records().iterator(MAX_READ_SIZE);
}

private MemoryRecords buildOverflowBatch(int remaining) {
// We do not have any records left to down-convert. Construct an overflow message for the length remaining.
// This message will be ignored by the consumer because its length will be past the length of maximum
// possible response size.
// DefaultRecordBatch =>
// BaseOffset => Int64
// Length => Int32
// ...
ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, MAX_READ_SIZE)));
overflowMessageBatch.putLong(-1L);

// Fill in the length of the overflow batch. A valid batch must be at least as long as the minimum batch
// overhead.
overflowMessageBatch.putInt(Math.max(remaining + 1, DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
log.debug("Constructed overflow message batch for partition {} with length={}", topicPartition(), remaining);
return MemoryRecords.readableRecords(overflowMessageBatch);
}

@Override
public long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException {
if (convertedRecordsWriter == null || convertedRecordsWriter.completed()) {
MemoryRecords convertedRecords;
// Check if we have more chunks left to down-convert
if (convertedRecordsIterator.hasNext()) {
// Get next chunk of down-converted messages
ConvertedRecords<MemoryRecords> recordsAndStats = convertedRecordsIterator.next();
convertedRecords = recordsAndStats.records();
recordConversionStats.add(recordsAndStats.recordConversionStats());
log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes());
} else {
// We do not have any records left to down-convert. Construct an overflow message for the length remaining.
// This message will be ignored by the consumer because its length will be past the length of maximum
// possible response size.
// DefaultRecordBatch =>
// BaseOffset => Int64
// Length => Int32
// ...
ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, MAX_READ_SIZE)));
overflowMessageBatch.putLong(-1L);

// Fill in the length of the overflow batch. A valid batch must be at least as long as the minimum batch
// overhead.
overflowMessageBatch.putInt(Math.max(remaining + 1, DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
convertedRecords = MemoryRecords.readableRecords(overflowMessageBatch);
log.debug("Constructed overflow message batch for partition {} with length={}", topicPartition(), remaining);
try {
// Check if we have more chunks left to down-convert
if (convertedRecordsIterator.hasNext()) {
// Get next chunk of down-converted messages
ConvertedRecords<MemoryRecords> recordsAndStats = convertedRecordsIterator.next();
convertedRecords = recordsAndStats.records();
recordConversionStats.add(recordsAndStats.recordConversionStats());
log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes());
} else {
convertedRecords = buildOverflowBatch(remaining);
}
} catch (UnsupportedCompressionTypeException e) {
// We have encountered a compression type which does not support down-conversion (e.g. zstd).
// Since we have already sent at least one batch and we have committed to the fetch size, we
// send an overflow batch. The consumer will read the first few records and then fetch from the
// offset of the batch which has the unsupported compression type. At that time, we will
// send back the UNSUPPORTED_COMPRESSION_TYPE erro which will allow the consumer to fail gracefully.
convertedRecords = buildOverflowBatch(remaining);
}

convertedRecordsWriter = new DefaultRecordsSend(destination(), convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining));
}
return convertedRecordsWriter.writeTo(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
throw new IllegalArgumentException("Transactional records are not supported for magic " + magic);
if (isControlBatch)
throw new IllegalArgumentException("Control records are not supported for magic " + magic);
if (compressionType == CompressionType.ZSTD)
throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic);
}

this.magic = magic;
Expand Down
Loading