compression.type
*/
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid "
- + " values are none
, gzip
, snappy
, or lz4
. "
+ + " values are none
, gzip
, snappy
, lz4
, or zstd
. "
+ "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
/** metrics.sample.window.ms
*/
diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index bc657ea738f69..4410c971a169f 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -140,7 +140,7 @@ public class TopicConfig {
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " +
- "This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally " +
+ "This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally " +
"accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " +
"original compression codec set by the producer.";
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedCompressionTypeException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedCompressionTypeException.java
new file mode 100644
index 0000000000000..29ffe1b900e23
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedCompressionTypeException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * The requesting client does not support the compression type of given partition.
+ */
+public class UnsupportedCompressionTypeException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public UnsupportedCompressionTypeException(String message) {
+ super(message);
+ }
+
+ public UnsupportedCompressionTypeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 7cf5a29508463..6b54a89daf309 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -91,6 +91,7 @@
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedByAuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -284,7 +285,9 @@ public enum Errors {
FENCED_LEADER_EPOCH(74, "The leader epoch in the request is older than the epoch on the broker",
FencedLeaderEpochException::new),
UNKNOWN_LEADER_EPOCH(75, "The leader epoch in the request is newer than the epoch on the broker",
- UnknownLeaderEpochException::new);
+ UnknownLeaderEpochException::new),
+ UNSUPPORTED_COMPRESSION_TYPE(76, "The requesting client does not support the compression type of given partition.",
+ UnsupportedCompressionTypeException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 6ac073dd29801..cdf731c3bf2d0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -322,6 +322,8 @@ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry,
throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic);
CompressionType compressionType = wrapperRecord.compressionType();
+ if (compressionType == CompressionType.ZSTD)
+ throw new InvalidRecordException("Invalid wrapper compressionType found in legacy deep record iterator " + wrapperMagic);
ByteBuffer wrapperValue = wrapperRecord.value();
if (wrapperValue == null)
throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index a333d2aea2799..352d12d834977 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -113,6 +113,26 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf
throw new KafkaException(e);
}
}
+ },
+
+ ZSTD(4, "zstd", 1.0f) {
+ @Override
+ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
+ try {
+ return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer);
+ } catch (Throwable e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ @Override
+ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
+ try {
+ return (InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
+ } catch (Throwable e) {
+ throw new KafkaException(e);
+ }
+ }
};
public final int id;
@@ -156,6 +176,8 @@ public static CompressionType forId(int id) {
return SNAPPY;
case 3:
return LZ4;
+ case 4:
+ return ZSTD;
default:
throw new IllegalArgumentException("Unknown compression type id: " + id);
}
@@ -170,6 +192,8 @@ else if (SNAPPY.name.equals(name))
return SNAPPY;
else if (LZ4.name.equals(name))
return LZ4;
+ else if (ZSTD.name.equals(name))
+ return ZSTD;
else
throw new IllegalArgumentException("Unknown compression name: " + name);
}
@@ -177,7 +201,7 @@ else if (LZ4.name.equals(name))
// We should only have a runtime dependency on compression algorithms in case the native libraries don't support
// some platforms.
//
- // For Snappy, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
+ // For Snappy and Zstd, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
// they're only loaded if used.
//
// For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger
@@ -190,6 +214,13 @@ private static class SnappyConstructors {
MethodType.methodType(void.class, OutputStream.class));
}
+ private static class ZstdConstructors {
+ static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStream",
+ MethodType.methodType(void.class, InputStream.class));
+ static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStream",
+ MethodType.methodType(void.class, OutputStream.class));
+ }
+
private static MethodHandle findConstructor(String className, MethodType methodType) {
try {
return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType);
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
index d58689de119a8..4e844736bbd38 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
@@ -44,6 +44,9 @@ public class LazyDownConversionRecords implements BaseRecords {
* @param firstOffset The starting offset for down-converted records. This only impacts some cases. See
* {@link RecordsUtil#downConvert(Iterable, byte, long, Time)} for an explanation.
* @param time The time instance to use
+ *
+ * @throws org.apache.kafka.common.errors.UnsupportedCompressionTypeException If the first batch to down-convert
+ * has a compression type which we do not support down-conversion for.
*/
public LazyDownConversionRecords(TopicPartition topicPartition, Records records, byte toMagic, long firstOffset, Time time) {
this.topicPartition = Objects.requireNonNull(topicPartition);
@@ -150,7 +153,7 @@ protected ConvertedRecords makeNext() {
}
while (batchIterator.hasNext()) {
- List