From d4b2bf484a3a6e846fa40c3464e813252b5ccb7b Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 16 Dec 2016 17:50:20 +0900 Subject: [PATCH 01/22] KAFKA-4514: Add Codec for ZStandard Compression --- build.gradle | 1 + .../clients/producer/ProducerConfig.java | 2 +- .../kafka/common/record/CompressionType.java | 34 ++++++++++++++++++- config/producer.properties | 2 +- .../kafka/message/CompressionCodec.scala | 9 ++++- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- .../scala/kafka/tools/ConsoleProducer.scala | 2 +- .../kafka/api/ProducerCompressionTest.scala | 3 +- docs/design.html | 2 +- docs/implementation.html | 1 + gradle/dependencies.gradle | 6 ++-- 11 files changed, 54 insertions(+), 10 deletions(-) diff --git a/build.gradle b/build.gradle index 47fa18620fd4e..2651f10da6521 100644 --- a/build.gradle +++ b/build.gradle @@ -820,6 +820,7 @@ project(':clients') { conf2ScopeMappings.addMapping(1000, configurations.jacksonDatabindConfig, "provided") dependencies { + compile libs.zstd compile libs.lz4 compile libs.snappy compile libs.slf4jApi diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 1a1bab5127bc8..6142519c4dcaa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -159,7 +159,7 @@ public class ProducerConfig extends AbstractConfig { /** compression.type */ public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " - + " values are none, gzip, snappy, or lz4. " + + " values are none, gzip, snappy, lz4, or zstd. " + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; /** metrics.sample.window.ms */ diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index a333d2aea2799..ed41aed7b5e15 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; @@ -113,6 +114,26 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf throw new KafkaException(e); } } + }, + + ZSTD(4, "zstd", 1.0f) { + @Override + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { + try { + return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer); + } catch (Throwable e) { + throw new KafkaException(e); + } + } + + @Override + public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { + try { + return (InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer)); + } catch (Throwable e) { + throw new KafkaException(e); + } + } }; public final int id; @@ -156,6 +177,8 @@ public static CompressionType forId(int id) { return SNAPPY; case 3: return LZ4; + case 4: + return ZSTD; default: throw new IllegalArgumentException("Unknown compression type id: " + id); } @@ -170,6 +193,8 @@ else if (SNAPPY.name.equals(name)) return SNAPPY; else if (LZ4.name.equals(name)) return LZ4; + else if (ZSTD.name.equals(name)) + return ZSTD; else throw new IllegalArgumentException("Unknown compression name: " + name); } @@ -177,7 +202,7 @@ else if (LZ4.name.equals(name)) // We should only have a runtime dependency on compression algorithms in case the native libraries don't support // some platforms. // - // For Snappy, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure + // For Snappy and ZSTD, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure // they're only loaded if used. // // For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger @@ -190,6 +215,13 @@ private static class SnappyConstructors { MethodType.methodType(void.class, OutputStream.class)); } + private static class ZstdConstructors { + static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStream", + MethodType.methodType(void.class, InputStream.class)); + static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStream", + MethodType.methodType(void.class, OutputStream.class)); + } + private static MethodHandle findConstructor(String className, MethodType methodType) { try { return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType); diff --git a/config/producer.properties b/config/producer.properties index 750b95ee0aeb7..4786b988a29b8 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -20,7 +20,7 @@ # format: host1:port1,host2:port2 ... bootstrap.servers=localhost:9092 -# specify the compression codec for all data generated: none, gzip, snappy, lz4 +# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd compression.type=none # name of the partitioner class for partitioning events; default partition spreads data randomly diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala index 64e0aaa72a16a..abe3694b0359c 100644 --- a/core/src/main/scala/kafka/message/CompressionCodec.scala +++ b/core/src/main/scala/kafka/message/CompressionCodec.scala @@ -28,6 +28,7 @@ object CompressionCodec { case GZIPCompressionCodec.codec => GZIPCompressionCodec case SnappyCompressionCodec.codec => SnappyCompressionCodec case LZ4CompressionCodec.codec => LZ4CompressionCodec + case ZStdCompressionCodec.codec => ZStdCompressionCodec case _ => throw new UnknownCodecException("%d is an unknown compression codec".format(codec)) } } @@ -37,6 +38,7 @@ object CompressionCodec { case GZIPCompressionCodec.name => GZIPCompressionCodec case SnappyCompressionCodec.name => SnappyCompressionCodec case LZ4CompressionCodec.name => LZ4CompressionCodec + case ZStdCompressionCodec.name => ZStdCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name)) } } @@ -44,7 +46,7 @@ object CompressionCodec { object BrokerCompressionCodec { - val brokerCompressionCodecs = List(UncompressedCodec, SnappyCompressionCodec, LZ4CompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec) + val brokerCompressionCodecs = List(UncompressedCodec, ZStdCompressionCodec, LZ4CompressionCodec, SnappyCompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec) val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name) def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase(Locale.ROOT)) @@ -87,6 +89,11 @@ case object LZ4CompressionCodec extends CompressionCodec with BrokerCompressionC val name = "lz4" } +case object ZStdCompressionCodec extends CompressionCodec with BrokerCompressionCodec { + val codec = 4 + val name = "zstd" +} + case object NoCompressionCodec extends CompressionCodec with BrokerCompressionCodec { val codec = 0 val name = "none" diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 700f32c534f07..9edda4ea7f114 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -720,7 +720,7 @@ object KafkaConfig { val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off" val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + - "('gzip', 'snappy', 'lz4'). It additionally accepts 'uncompressed' which is equivalent to no compression; and " + + "('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'uncompressed' which is equivalent to no compression; and " + "'producer' which means retain the original compression codec set by the producer." /** ********* Kafka Metrics Configuration ***********/ diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 8d8c42d36cf07..a90999545646e 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -120,7 +120,7 @@ object ConsoleProducer { .describedAs("broker-list") .ofType(classOf[String]) val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") - val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', or 'lz4'." + + val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." + "If specified without value, then it defaults to 'gzip'") .withOptionalArg() .describedAs("compression-codec") diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index bb49884686be6..24193521f91ee 100755 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -113,7 +113,8 @@ object ProducerCompressionTest { Array("none"), Array("gzip"), Array("snappy"), - Array("lz4") + Array("lz4"), + Array("zstd") ).asJava } } diff --git a/docs/design.html b/docs/design.html index bdc7e637ea9b8..0061a53c49d8c 100644 --- a/docs/design.html +++ b/docs/design.html @@ -136,7 +136,7 @@

End-to-end Batch Compr Kafka supports this with an efficient batching format. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will remain compressed in the log and will only be decompressed by the consumer.

- Kafka supports GZIP, Snappy and LZ4 compression protocols. More details on compression can be found here. + Kafka supports GZIP, Snappy, LZ4 and ZStandard compression protocols. More details on compression can be found here.

4.4 The Producer

diff --git a/docs/implementation.html b/docs/implementation.html index 4ecce7b448578..35ac6696a06b6 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -44,6 +44,7 @@

5.3.1 Record Batch

1: gzip 2: snappy 3: lz4 + 4: zstd bit 3: timestampType bit 4: isTransactional (0 means not transactional) bit 5: isControlBatch (0 means not a control batch) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 23fc68aaba517..70ae2d6ee8f00 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -83,7 +83,8 @@ versions += [ slf4j: "1.7.25", snappy: "1.1.7.2", zkclient: "0.10", - zookeeper: "3.4.13" + zookeeper: "3.4.13", + zstd: "1.3.3-4" ] libs += [ @@ -141,5 +142,6 @@ libs += [ zkclient: "com.101tec:zkclient:$versions.zkclient", zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper", jfreechart: "jfreechart:jfreechart:$versions.jfreechart", - mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact" + mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact", + zstd: "com.github.luben:zstd-jni:$versions.zstd", ] From bda05f76db05c14ae0d683a12d9853d31b3be05a Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 10 Mar 2018 19:03:57 +0900 Subject: [PATCH 02/22] Minor updates 1. zstd version updated: 1.1.1 -> 1.3.3-4. 2. Fix indentation: implementation.html, MessageCompressionTest.scala. 3. Add zstd to TopicConfig.java. 4. Add zstd to compression_test.py. --- .../main/java/org/apache/kafka/common/config/TopicConfig.java | 2 +- docs/implementation.html | 2 +- tests/kafkatest/tests/client/compression_test.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index bc657ea738f69..4410c971a169f 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -140,7 +140,7 @@ public class TopicConfig { public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " + - "This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally " + + "This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally " + "accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " + "original compression codec set by the producer."; diff --git a/docs/implementation.html b/docs/implementation.html index 35ac6696a06b6..cc2b0f47ffe81 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -44,7 +44,7 @@

5.3.1 Record Batch

1: gzip 2: snappy 3: lz4 - 4: zstd + 4: zstd bit 3: timestampType bit 4: isTransactional (0 means not transactional) bit 5: isControlBatch (0 means not a control batch) diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py index 165e11add1933..2085d9b6259c7 100644 --- a/tests/kafkatest/tests/client/compression_test.py +++ b/tests/kafkatest/tests/client/compression_test.py @@ -54,7 +54,7 @@ def min_cluster_size(self): return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers @cluster(num_nodes=7) - @parametrize(compression_types=["snappy","gzip","lz4","none"]) + @parametrize(compression_types=["snappy","gzip","lz4","zstd","none"]) def test_compressed_topic(self, compression_types): """Test produce => consume => validate for compressed topics Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1 From 82ba81a15bfe4f84e42a756c550896facdcc8e60 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Wed, 13 Jun 2018 21:06:47 +0900 Subject: [PATCH 03/22] Update ZStd version: 1.3.3 -> 1.3.4 (zstd-jni 1.3.4-10) --- .../java/org/apache/kafka/common/record/CompressionType.java | 2 +- gradle/dependencies.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index ed41aed7b5e15..ab91c8028bad5 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -202,7 +202,7 @@ else if (ZSTD.name.equals(name)) // We should only have a runtime dependency on compression algorithms in case the native libraries don't support // some platforms. // - // For Snappy and ZSTD, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure + // For Snappy and Zstd, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure // they're only loaded if used. // // For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 70ae2d6ee8f00..7364158fb1571 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -84,7 +84,7 @@ versions += [ snappy: "1.1.7.2", zkclient: "0.10", zookeeper: "3.4.13", - zstd: "1.3.3-4" + zstd: "1.3.4-10" ] libs += [ From 51bdf15e300b24f5d18cb320ef07adcf56fa5581 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 14 Jul 2018 22:15:56 +0900 Subject: [PATCH 04/22] Update ZStd version: 1.3.4 -> 1.3.5 (zstd-jni 1.3.5-2) --- gradle/dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 7364158fb1571..f2a2a0510931b 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -84,7 +84,7 @@ versions += [ snappy: "1.1.7.2", zkclient: "0.10", zookeeper: "3.4.13", - zstd: "1.3.4-10" + zstd: "1.3.5-2" ] libs += [ From 35dcffa5d1c391f74913efc894c73019261f1f7e Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Mon, 13 Aug 2018 07:25:23 +0900 Subject: [PATCH 05/22] Add license description of zstd and zstd-jni --- LICENSE | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/LICENSE b/LICENSE index bf7fe1c487b38..db706caf09e7f 100644 --- a/LICENSE +++ b/LICENSE @@ -201,6 +201,7 @@ See the License for the specific language governing permissions and limitations under the License. +------------------------------------------------------------------------------------ This distribution has a binary dependency on jersey, which is available under the CDDL License as described below. @@ -328,3 +329,68 @@ As between Initial Developer and the Contributors, each party is responsible for NOTICE PURSUANT TO SECTION 9 OF THE COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) The code released under the CDDL shall be governed by the laws of the State of California (excluding conflict-of-law provisions). Any litigation relating to this License shall be subject to the jurisdiction of the Federal Courts of the Northern District of California and the state courts of the State of California, with venue lying in Santa Clara County, California. + +------------------------------------------------------------------------------------ +This distribution has a binary dependency on zstd, which is available under the BSD 3-Clause License as described below. + +BSD License + +For Zstandard software + +Copyright (c) 2016-present, Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook nor the names of its contributors may be used to + endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +------------------------------------------------------------------------------------ +This distribution has a binary dependency on zstd-jni, which is available under the BSD 2-Clause License +as described below. + +Zstd-jni: JNI bindings to Zstd Library + +Copyright (c) 2015-2016, Luben Karavelov/ All rights reserved. + +BSD License + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, this + list of conditions and the following disclaimer in the documentation and/or + other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. From 8872bdf22332cc5a90973aff5bb7ac180735a625 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 31 Aug 2018 21:19:13 +0900 Subject: [PATCH 06/22] Disallow fetch request with API version < 10 for ZStandard compressed partition 1. Add Errors.UNSUPPORTED_COMPRESSION_TYPE with corresponding Exception, UnsupportedCompressionTypeException. 2. Define FetchRequest.FETCH_REQUEST_V10. (same as FETCH_REQUEST_V9) 3. Throw Errors.UNSUPPORTED_COMPRESSION_TYPE when fetch request for a ZStandard compressed partition with API version < 10 is detected. --- .../UnsupportedCompressionTypeException.java | 34 +++++++++++++++++++ .../apache/kafka/common/protocol/Errors.java | 5 ++- .../kafka/common/requests/FetchRequest.java | 6 +++- .../kafka/common/requests/FetchResponse.java | 5 ++- .../src/main/scala/kafka/api/ApiVersion.scala | 10 +++++- .../main/scala/kafka/server/KafkaApis.scala | 31 ++++++++++------- .../kafka/server/ReplicaFetcherThread.scala | 2 +- .../scala/unit/kafka/api/ApiVersionTest.scala | 3 +- 8 files changed, 78 insertions(+), 18 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/UnsupportedCompressionTypeException.java diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedCompressionTypeException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedCompressionTypeException.java new file mode 100644 index 0000000000000..29ffe1b900e23 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedCompressionTypeException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * The requesting client does not support the compression type of given partition. + */ +public class UnsupportedCompressionTypeException extends ApiException { + + private static final long serialVersionUID = 1L; + + public UnsupportedCompressionTypeException(String message) { + super(message); + } + + public UnsupportedCompressionTypeException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 7cf5a29508463..6b54a89daf309 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -91,6 +91,7 @@ import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedByAuthenticationException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -284,7 +285,9 @@ public enum Errors { FENCED_LEADER_EPOCH(74, "The leader epoch in the request is older than the epoch on the broker", FencedLeaderEpochException::new), UNKNOWN_LEADER_EPOCH(75, "The leader epoch in the request is newer than the epoch on the broker", - UnknownLeaderEpochException::new); + UnknownLeaderEpochException::new), + UNSUPPORTED_COMPRESSION_TYPE(76, "The requesting client does not support the compression type of given partition.", + UnsupportedCompressionTypeException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 32eb24d7de2d1..8d94bfd2134ea 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -191,9 +191,13 @@ public class FetchRequest extends AbstractRequest { FETCH_REQUEST_TOPIC_V9, FORGOTTEN_TOPIC_DATA_V7); + // V10 bumped up to indicate ZStandard capability. (see KIP-110) + private static final Schema FETCH_REQUEST_V10 = FETCH_REQUEST_V9; + public static Schema[] schemaVersions() { return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, - FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, FETCH_REQUEST_V8, FETCH_REQUEST_V9}; + FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, FETCH_REQUEST_V8, FETCH_REQUEST_V9, + FETCH_REQUEST_V10}; } // default values for older versions where a request level limit did not exist diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index f87f2ef05d382..cf72664522244 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -180,10 +180,13 @@ public class FetchResponse extends AbstractResponse { // V9 adds the current leader epoch (see KIP-320) private static final Schema FETCH_RESPONSE_V9 = FETCH_RESPONSE_V8; + // V10 bumped up to indicate ZStandard capability. (see KIP-110) + private static final Schema FETCH_RESPONSE_V10 = FETCH_RESPONSE_V9; + public static Schema[] schemaVersions() { return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6, - FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9}; + FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9, FETCH_RESPONSE_V10}; } public static final long INVALID_HIGHWATERMARK = -1L; diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index bc3602bd69a95..0e0385143429b 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -79,7 +79,9 @@ object ApiVersion { // Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211) KAFKA_2_1_IV0, // New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320) - KAFKA_2_1_IV1 + KAFKA_2_1_IV1, + // Support ZStandard Compression Codec (KIP-110) + KAFKA_2_1_IV2 ) // Map keys are the union of the short and full versions @@ -270,6 +272,12 @@ case object KAFKA_2_1_IV1 extends DefaultApiVersion { val id: Int = 18 } +case object KAFKA_2_1_IV2 extends DefaultApiVersion { + val shortVersion: String = "2.1" + val subVersion = "IV2" + val recordVersion = RecordVersion.V2 + val id: Int = 19 + object ApiVersionValidator extends Validator { override def ensureValid(name: String, value: Any): Unit = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 18ab9db22ed6e..75f14943c1d91 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -31,6 +31,8 @@ import kafka.common.OffsetAndMetadata import kafka.controller.KafkaController import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult} import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} +import kafka.log.{Log, LogManager, TimestampOffset} +import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.network.RequestChannel import kafka.security.SecurityUtils import kafka.security.auth.{Resource, _} @@ -558,18 +560,23 @@ class KafkaApis(val requestChannel: RequestChannel, trace(s"Conversion to message format ${downConvertMagic.get} is disabled for partition $tp. Sending unsupported version response to $clientId.") errorResponse(Errors.UNSUPPORTED_VERSION) } else { - val convertedRecords = - downConvertMagic.map { magic => - trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId") - // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much - // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked - // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the - // client. - new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time) - }.getOrElse(unconvertedRecords) - new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark, - FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions, - convertedRecords) + if (logConfig.forall(_.compressionType == "zstd") && versionId < 10) { + trace(s"Fetching messages is disabled for Zstd compressed partition $tp. Sending unsupported version response to $clientId.") + errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE) + } else { + val convertedRecords = + downConvertMagic.map { magic => + trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId") + // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much + // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked + // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the + // client. + new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time) + }.getOrElse(unconvertedRecords) + new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark, + FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions, + convertedRecords) + } } } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 930281a342cc4..aeeaf29516aef 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -62,7 +62,7 @@ class ReplicaFetcherThread(name: String, // Visible for testing private[server] val fetchRequestVersion: Short = - if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 9 + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV2) 10 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 8 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0) 7 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5 diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala index d2d115b2cfaa7..1ffa695f48c52 100644 --- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala +++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala @@ -80,9 +80,10 @@ class ApiVersionTest { assertEquals(KAFKA_2_0_IV0, ApiVersion("2.0-IV0")) assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0-IV1")) - assertEquals(KAFKA_2_1_IV1, ApiVersion("2.1")) + assertEquals(KAFKA_2_1_IV2, ApiVersion("2.1")) assertEquals(KAFKA_2_1_IV0, ApiVersion("2.1-IV0")) assertEquals(KAFKA_2_1_IV1, ApiVersion("2.1-IV1")) + assertEquals(KAFKA_2_1_IV2, ApiVersion("2.1-IV2")) } @Test From fbdcabebe5588198470b078fee208e5f1a369833 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Mon, 17 Sep 2018 01:22:58 +0900 Subject: [PATCH 07/22] Disallow produce request with API version < 7 for ZStandard compressed partition --- .../org/apache/kafka/common/requests/ProduceRequest.java | 9 ++++++++- .../apache/kafka/common/requests/ProduceResponse.java | 7 ++++++- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 4f1d766b8bf3e..69e35e498045a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -113,9 +113,14 @@ public class ProduceRequest extends AbstractRequest { */ private static final Schema PRODUCE_REQUEST_V6 = PRODUCE_REQUEST_V5; + /** + * V7 bumped up to indicate ZStandard capability. (see KIP-110) + */ + private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6; + public static Schema[] schemaVersions() { return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3, - PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6}; + PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7}; } public static class Builder extends AbstractRequest.Builder { @@ -330,6 +335,7 @@ public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) { case 4: case 5: case 6: + case 7: return new ProduceResponse(responseMap, throttleTimeMs); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", @@ -400,6 +406,7 @@ public static byte requiredMagicForVersion(short produceRequestVersion) { case 4: case 5: case 6: + case 7: return RecordBatch.MAGIC_VALUE_V2; default: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index fb15813ccf1eb..7d3e4fed36280 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -144,9 +144,14 @@ public class ProduceResponse extends AbstractResponse { */ private static final Schema PRODUCE_RESPONSE_V6 = PRODUCE_RESPONSE_V5; + /** + * V7 bumped up to indicate ZStandard capability. (see KIP-110) + */ + private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6; + public static Schema[] schemaVersions() { return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3, - PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6}; + PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7}; } private final Map responses; diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 75f14943c1d91..c8576fbbb6234 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -375,6 +375,7 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle a produce request */ def handleProduceRequest(request: RequestChannel.Request) { + val versionId = request.header.apiVersion val produceRequest = request.body[ProduceRequest] val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes @@ -392,6 +393,7 @@ class KafkaApis(val requestChannel: RequestChannel, val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() + val erroneousTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) { @@ -399,13 +401,15 @@ class KafkaApis(val requestChannel: RequestChannel, unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicPartition)) nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) + else if (replicaManager.getLogConfig(topicPartition).forall(_.compressionType == "zstd") && versionId < 7) + erroneousTopicResponses += (topicPartition -> new PartitionResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)) else authorizedRequestInfo += (topicPartition -> memoryRecords) } // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { - val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses + val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ erroneousTopicResponses var errorInResponse = false mergedResponseStatus.foreach { case (topicPartition, status) => From 74d6b19c7ab4bc7f02b9145ab11b4854ed1954d9 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 21 Sep 2018 22:23:01 +0900 Subject: [PATCH 08/22] Disallow MemoryRecordsBuilder instantiation with ZSTD compression codec if magic is not v2 --- .../common/record/MemoryRecordsBuilder.java | 2 + .../producer/internals/ProducerBatchTest.java | 3 + .../common/record/FileLogInputStreamTest.java | 12 ++++ .../record/MemoryRecordsBuilderTest.java | 69 +++++++++++++++++++ .../common/record/MemoryRecordsTest.java | 50 ++++++++++++++ .../src/main/scala/kafka/api/ApiVersion.scala | 1 + ...gCleanerParameterizedIntegrationTest.scala | 8 +++ 7 files changed, 145 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 1c7a6c746dfe2..0cc2cec31f902 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -102,6 +102,8 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, throw new IllegalArgumentException("Transactional records are not supported for magic " + magic); if (isControlBatch) throw new IllegalArgumentException("Control records are not supported for magic " + magic); + if (compressionType == CompressionType.ZSTD) + throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic); } this.magic = magic; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index 6a85449a65985..f9887f9033e81 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -197,6 +197,9 @@ public void testSplitPreservesMagicAndCompressionType() { if (compressionType == CompressionType.NONE && magic < MAGIC_VALUE_V2) continue; + if (compressionType == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) + continue; + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compressionType, TimestampType.CREATE_TIME, 0L); diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java index 8a955972b5c51..783a5b531ef27 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java @@ -56,6 +56,9 @@ public FileLogInputStreamTest(byte magic, CompressionType compression) { @Test public void testWriteTo() throws IOException { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) + return; + try (FileRecords fileRecords = FileRecords.open(tempFile())) { fileRecords.append(MemoryRecords.withRecords(magic, compression, new SimpleRecord("foo".getBytes()))); fileRecords.flush(); @@ -81,6 +84,9 @@ public void testWriteTo() throws IOException { @Test public void testSimpleBatchIteration() throws IOException { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) + return; + try (FileRecords fileRecords = FileRecords.open(tempFile())) { SimpleRecord firstBatchRecord = new SimpleRecord(3241324L, "a".getBytes(), "foo".getBytes()); SimpleRecord secondBatchRecord = new SimpleRecord(234280L, "b".getBytes(), "bar".getBytes()); @@ -108,6 +114,9 @@ public void testBatchIterationWithMultipleRecordsPerBatch() throws IOException { if (magic < MAGIC_VALUE_V2 && compression == CompressionType.NONE) return; + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) + return; + try (FileRecords fileRecords = FileRecords.open(tempFile())) { SimpleRecord[] firstBatchRecords = new SimpleRecord[]{ new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()), @@ -185,6 +194,9 @@ public void testBatchIterationV2() throws IOException { @Test public void testBatchIterationIncompleteBatch() throws IOException { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) + return; + try (FileRecords fileRecords = FileRecords.open(tempFile())) { SimpleRecord firstBatchRecord = new SimpleRecord(100L, "foo".getBytes()); SimpleRecord secondBatchRecord = new SimpleRecord(200L, "bar".getBytes()); diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 5d5221ecceac9..1050cf36d452e 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -19,7 +19,9 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -39,6 +41,8 @@ @RunWith(value = Parameterized.class) public class MemoryRecordsBuilderTest { + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); private final CompressionType compressionType; private final int bufferOffset; @@ -52,6 +56,11 @@ public MemoryRecordsBuilderTest(int bufferOffset, CompressionType compressionTyp @Test public void testWriteEmptyRecordSet() { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); + } + ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); @@ -207,6 +216,11 @@ public void testWriteEndTxnMarkerNonControlBatch() { @Test public void testCompressionRateV0() { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); + } + ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); @@ -262,6 +276,11 @@ public void testEstimatedSizeInBytes() { @Test public void testCompressionRateV1() { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 1"); + } + ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); @@ -293,6 +312,11 @@ public void testCompressionRateV1() { @Test public void buildUsingLogAppendTime() { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 1"); + } + ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); @@ -322,6 +346,11 @@ public void buildUsingLogAppendTime() { @Test public void buildUsingCreateTime() { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 1"); + } + ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); @@ -353,6 +382,11 @@ public void buildUsingCreateTime() { @Test public void testAppendedChecksumConsistency() { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); + } + ByteBuffer buffer = ByteBuffer.allocate(512); for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, @@ -397,6 +431,11 @@ public void testSmallWriteLimit() { @Test public void writePastLimit() { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 1"); + } + ByteBuffer buffer = ByteBuffer.allocate(64); buffer.position(bufferOffset); @@ -442,6 +481,11 @@ public void testAppendAtInvalidOffset() { @Test public void convertV2ToV1UsingMixedCreateAndLogAppendTime() { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 1"); + } + ByteBuffer buffer = ByteBuffer.allocate(512); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, TimestampType.LOG_APPEND_TIME, 0L); @@ -497,6 +541,11 @@ public void convertV2ToV1UsingMixedCreateAndLogAppendTime() { @Test public void convertToV1WithMixedV0AndV2Data() { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); + } + ByteBuffer buffer = ByteBuffer.allocate(512); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.NO_TIMESTAMP_TYPE, 0L); @@ -571,6 +620,11 @@ public void convertToV1WithMixedV0AndV2Data() { @Test public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exception { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); + } + ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); @@ -588,6 +642,11 @@ public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exceptio @Test public void shouldResetBufferToInitialPositionOnAbort() throws Exception { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); + } + ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); @@ -601,6 +660,11 @@ public void shouldResetBufferToInitialPositionOnAbort() throws Exception { @Test public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() throws Exception { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); + } + ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); @@ -618,6 +682,11 @@ public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() throws Exceptio @Test public void shouldThrowIllegalStateExceptionOnAppendWhenAborted() throws Exception { + if (compressionType == CompressionType.ZSTD) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); + } + ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 579fb74b44a83..aaafeeee544b4 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -22,7 +22,9 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -33,6 +35,7 @@ import java.util.List; import static java.util.Arrays.asList; +import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -42,6 +45,8 @@ @RunWith(value = Parameterized.class) public class MemoryRecordsTest { + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); private CompressionType compression; private byte magic; @@ -69,6 +74,11 @@ public MemoryRecordsTest(byte magic, long firstOffset, CompressionType compressi @Test public void testIterator() { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); + } + ByteBuffer buffer = ByteBuffer.allocate(1024); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compression, @@ -152,6 +162,11 @@ public void testIterator() { @Test public void testHasRoomForMethod() { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); + } + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, 0L); builder.append(0L, "a".getBytes(), "1".getBytes()); @@ -439,6 +454,11 @@ public void testBuildEndTxnMarker() { @Test public void testFilterToBatchDiscard() { if (compression != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); + } + ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); builder.append(10L, "1".getBytes(), "a".getBytes()); @@ -489,6 +509,11 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { @Test public void testFilterToAlreadyCompactedLog() { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); + } + ByteBuffer buffer = ByteBuffer.allocate(2048); // create a batch with some offset gaps to simulate a compacted batch @@ -629,6 +654,11 @@ public void testFilterToPreservesProducerInfo() { @Test public void testFilterToWithUndersizedBuffer() { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); + } + ByteBuffer buffer = ByteBuffer.allocate(1024); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); builder.append(10L, null, "a".getBytes()); @@ -679,6 +709,11 @@ public void testFilterToWithUndersizedBuffer() { @Test public void testToString() { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); + } + long timestamp = 1000000; MemoryRecords memoryRecords = MemoryRecords.withRecords(magic, compression, new SimpleRecord(timestamp, "key1".getBytes(), "value1".getBytes()), @@ -709,6 +744,11 @@ public void testToString() { @Test public void testFilterTo() { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); + } + ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); builder.append(10L, null, "a".getBytes()); @@ -822,6 +862,11 @@ public void testFilterTo() { @Test public void testFilterToPreservesLogAppendTime() { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); + } + long logAppendTime = System.currentTimeMillis(); ByteBuffer buffer = ByteBuffer.allocate(2048); @@ -866,6 +911,11 @@ public void testFilterToPreservesLogAppendTime() { @Test public void testNextBatchSize() { + if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); + } + ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, pid, epoch, firstSequence); diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 0e0385143429b..e9b16fafeee24 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -277,6 +277,7 @@ case object KAFKA_2_1_IV2 extends DefaultApiVersion { val subVersion = "IV2" val recordVersion = RecordVersion.V2 val id: Int = 19 +} object ApiVersionValidator extends Validator { diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala index 266bb391e2e4e..232cfdbf1c181 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala @@ -131,6 +131,10 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A @Test def testCleanerWithMessageFormatV0(): Unit = { + // zstd compression is not supported with older message formats + if (codec == CompressionType.ZSTD) + return + val largeMessageKey = 20 val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0) val maxMessageSize = codec match { @@ -181,6 +185,10 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A @Test def testCleaningNestedMessagesWithMultipleVersions(): Unit = { + // zstd compression is not supported with older message formats + if (codec == CompressionType.ZSTD) + return + val maxMessageSize = 192 cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize) From 671e12f757720aef54caed280f92182d221f5fca Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Wed, 26 Sep 2018 03:16:04 +0900 Subject: [PATCH 09/22] Disallow downconversion of ZSTD compressed records --- .../kafka/common/record/RecordsUtil.java | 3 +- .../record/MemoryRecordsBuilderTest.java | 64 ++++++++++++------- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java index c9b739413175c..820b98da3eeb1 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java @@ -45,7 +45,8 @@ protected static ConvertedRecords downConvert(Iterable convertedRecords = MemoryRecords.readableRecords(buffer) .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time); MemoryRecords records = convertedRecords.records(); + List batches = Utils.toList(records.batches().iterator()); + List logRecords = Utils.toList(records.records().iterator()); - // Transactional markers are skipped when down converting to V1, so exclude them from size - verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), - 3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers); + switch (compressionType) { + case NONE: + // Transactional markers are skipped when down converting to V1, so exclude them from size + verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), + 3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers); + + assertEquals(3, batches.size()); + assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); + assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); + assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType()); + + assertEquals(3, logRecords.size()); + assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key()); + assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key()); + assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key()); + break; + case ZSTD: + // Downconversion of ZSTD compressed records is disallowed + verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), + 3, 0, records.sizeInBytes(), sizeExcludingTxnMarkers); - List batches = Utils.toList(records.batches().iterator()); - if (compressionType != CompressionType.NONE) { - assertEquals(2, batches.size()); - assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); - assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); - } else { - assertEquals(3, batches.size()); - assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); - assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); - assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType()); - } + assertEquals(0, batches.size()); - List logRecords = Utils.toList(records.records().iterator()); - assertEquals(3, logRecords.size()); - assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key()); - assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key()); - assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key()); + assertEquals(0, logRecords.size()); + break; + default: + // Transactional markers are skipped when down converting to V1, so exclude them from size + verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), + 3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers); + + assertEquals(2, batches.size()); + assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); + assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); + + assertEquals(3, logRecords.size()); + assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key()); + assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key()); + assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key()); + break; + } } @Test From c6bbabda2cdc618910ed9bcaba704ca3be2fff93 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Thu, 27 Sep 2018 01:43:04 +0900 Subject: [PATCH 10/22] Make the constructor of ProduceRequest.Builder public: 1. parity with FetchRequest.Builder. 2. Required for testing produce request with any version. --- .../apache/kafka/common/requests/ProduceRequest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 69e35e498045a..a03ad582cc726 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -156,12 +156,12 @@ public static Builder forMagic(byte magic, return new Builder(minVersion, maxVersion, acks, timeout, partitionRecords, transactionalId); } - private Builder(short minVersion, - short maxVersion, - short acks, - int timeout, - Map partitionRecords, - String transactionalId) { + public Builder(short minVersion, + short maxVersion, + short acks, + int timeout, + Map partitionRecords, + String transactionalId) { super(ApiKeys.PRODUCE, minVersion, maxVersion); this.acks = acks; this.timeout = timeout; From e5a7fd4c6a0bd104b1d7e390be1c22d7ce7d4cbe Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Thu, 27 Sep 2018 02:00:48 +0900 Subject: [PATCH 11/22] Add integration tests: FetchRequestTest, ProduceRequestTest 1. Disallow fetch request with API version < 10 for ZStandard compressed partition. (FetchRequestTest#testZStdCompressedTopic) 2. Disallow downconversion of ZSTD compressed records. (FetchRequestTest#testZStdCompressedRecords) 3. Disallow produce request with API version < 7 for ZStandard compressed partition. (ProduceRequestTest#testZSTDProduceRequest) --- .../unit/kafka/server/FetchRequestTest.scala | 87 +++++++++++++++++++ .../kafka/server/ProduceRequestTest.scala | 36 +++++++- 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index b4315d105def3..436c04cc2f106 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -22,6 +22,7 @@ import java.util.{Optional, Properties} import kafka.api.KAFKA_0_11_0_IV2 import kafka.log.LogConfig +import kafka.message.{ProducerCompressionCodec, ZStdCompressionCodec} import kafka.utils.TestUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition @@ -424,6 +425,92 @@ class FetchRequestTest extends BaseRequestTest { assertFalse(resp4.responseData().containsKey(bar0)) } + @Test + def testZStdCompressedTopic(): Unit = { + // ZSTD compressed topic + val topicConfig = Map(LogConfig.CompressionTypeProp -> ZStdCompressionCodec.name) + val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1, configs = topicConfig).head + + // Produce messages (v2) + producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), + keySerializer = new StringSerializer, + valueSerializer = new StringSerializer) + producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, + "key1", "value1")).get + producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, + "key2", "value2")).get + producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, + "key3", "value3")).get + producer.close + + // fetch request with version below v10: UNSUPPORTED_COMPRESSION_TYPE error occurs + val req0 = new FetchRequest.Builder(0, 9, -1, Int.MaxValue, 0, + createPartitionMap(300, Seq(topicPartition), Map.empty)) + .setMaxBytes(800).build() + + val res0 = sendFetchRequest(leaderId, req0) + val data0 = res0.responseData.get(topicPartition) + assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data0.error) + + // fetch request with version 10: works fine! + val req1= new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0, + createPartitionMap(300, Seq(topicPartition), Map.empty)) + .setMaxBytes(800).build() + val res1 = sendFetchRequest(leaderId, req1) + val data1 = res1.responseData.get(topicPartition) + assertEquals(Errors.NONE, data1.error) + assertEquals(records(data1).size, 3) + } + + @Test + def testZStdCompressedRecords(): Unit = { + // Producer compressed topic + val topicConfig = Map(LogConfig.CompressionTypeProp -> ProducerCompressionCodec.name) + val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1, configs = topicConfig).head + + // Produce ZSTD compressed messages (v2) + producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), + compressionType = ZStdCompressionCodec.name, + keySerializer = new StringSerializer, + valueSerializer = new StringSerializer) + producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, + "key1", "value1")).get + producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, + "key2", "value2")).get + producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, + "key3", "value3")).get + producer.close + + // fetch request with fetch version v1: empty response (from magic 2 to 0 down-conversion is disallowed!) + val req0 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0, + createPartitionMap(300, Seq(topicPartition), Map.empty)) + .setMaxBytes(800).build() + + val res0 = sendFetchRequest(leaderId, req0) + val data0 = res0.responseData.get(topicPartition) + assertEquals(Errors.NONE, data0.error) + assertEquals(records(data0).size, 0) + + // fetch request with fetch version v3: empty response (from magic 2 to 1 down-conversion is disallowed!) + val req1 = new FetchRequest.Builder(2, 3, -1, Int.MaxValue, 0, + createPartitionMap(300, Seq(topicPartition), Map.empty)) + .setMaxBytes(800).build() + + val res1 = sendFetchRequest(leaderId, req1) + val data1 = res0.responseData.get(topicPartition) + assertEquals(Errors.NONE, data1.error) + assertEquals(records(data1).size, 0) + + // fetch request with version 10: works fine! + val req2= new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0, + createPartitionMap(300, Seq(topicPartition), Map.empty)) + .setMaxBytes(800).build() + val res2 = sendFetchRequest(leaderId, req2) + val data2 = res2.responseData.get(topicPartition) + assertEquals(Errors.NONE, data2.error) + assertEquals(records(data2).size, 3) + } + private def records(partitionData: FetchResponse.PartitionData[MemoryRecords]): Seq[Record] = { partitionData.records.records.asScala.toIndexedSeq } diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 4e66494374f71..3da6ecb11999a 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -17,10 +17,14 @@ package kafka.server +import java.util.Properties + +import kafka.log.LogConfig +import kafka.message.ZStdCompressionCodec import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.record.{CompressionType, DefaultRecordBatch, MemoryRecords, SimpleRecord} +import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} import org.junit.Assert._ import org.junit.Test @@ -111,6 +115,36 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(-1, partitionResponse.logAppendTime) } + @Test + def testZSTDProduceRequest(): Unit = { + val topic = "topic" + val partition = 0 + + // Create a single-partition topic compressed with ZSTD + val topicConfig = new Properties + topicConfig.setProperty(LogConfig.CompressionTypeProp, ZStdCompressionCodec.name) + val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, servers, topicConfig) + val leader = partitionToLeader(partition) + val memoryRecords = MemoryRecords.withRecords(CompressionType.ZSTD, + new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)) + val topicPartition = new TopicPartition("topic", partition) + val partitionRecords = Map(topicPartition -> memoryRecords) + + // produce request with version below v7: UNSUPPORTED_COMPRESSION_TYPE error occurs + val res0 = sendProduceRequest(leader, + new ProduceRequest.Builder(3, 6, -1, 3000, partitionRecords.asJava, null).build()) + assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, res0.responses.asScala.head._2.error) + + // produce request with v7: works fine! + val res1 = sendProduceRequest(leader, + new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava, null).build()) + val (tp, partitionResponse) = res1.responses.asScala.head + assertEquals(topicPartition, tp) + assertEquals(Errors.NONE, partitionResponse.error) + assertEquals(0, partitionResponse.baseOffset) + assertEquals(-1, partitionResponse.logAppendTime) + } + private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse = { val response = connectAndSend(request, ApiKeys.PRODUCE, destination = brokerSocketServer(leaderId)) ProduceResponse.parse(response, request.version) From 06b2a5944a501078c12bd663d2cf9b9e2d2a5adf Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Thu, 27 Sep 2018 10:21:09 +0900 Subject: [PATCH 12/22] Update zstd-jni version: 1.3.5-2 -> 1.3.5-4 --- gradle/dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index f2a2a0510931b..18a25349088d2 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -84,7 +84,7 @@ versions += [ snappy: "1.1.7.2", zkclient: "0.10", zookeeper: "3.4.13", - zstd: "1.3.5-2" + zstd: "1.3.5-4" ] libs += [ From 561d21c0e2cd63917b5366f94d150fc6f3b0eee3 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 28 Sep 2018 12:41:38 +0900 Subject: [PATCH 13/22] Fix: remove unneeded newline addition at CompressionType.java --- .../java/org/apache/kafka/common/record/CompressionType.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index ab91c8028bad5..352d12d834977 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; From 3b4f6aeeb72d6f91f1737caff28c90fc38c820c5 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Wed, 3 Oct 2018 15:20:25 +0900 Subject: [PATCH 14/22] Revert "Disallow downconversion of ZSTD compressed records" This reverts commit ccd311908e2db00381634102ba1de252e974863c. --- .../kafka/common/record/RecordsUtil.java | 3 +- .../record/MemoryRecordsBuilderTest.java | 64 +++++++------------ 2 files changed, 25 insertions(+), 42 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java index 820b98da3eeb1..c9b739413175c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java @@ -45,8 +45,7 @@ protected static ConvertedRecords downConvert(Iterable convertedRecords = MemoryRecords.readableRecords(buffer) .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time); MemoryRecords records = convertedRecords.records(); - List batches = Utils.toList(records.batches().iterator()); - List logRecords = Utils.toList(records.records().iterator()); - - switch (compressionType) { - case NONE: - // Transactional markers are skipped when down converting to V1, so exclude them from size - verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), - 3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers); - - assertEquals(3, batches.size()); - assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); - assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); - assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType()); - - assertEquals(3, logRecords.size()); - assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key()); - assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key()); - assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key()); - break; - case ZSTD: - // Downconversion of ZSTD compressed records is disallowed - verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), - 3, 0, records.sizeInBytes(), sizeExcludingTxnMarkers); - assertEquals(0, batches.size()); + // Transactional markers are skipped when down converting to V1, so exclude them from size + verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), + 3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers); - assertEquals(0, logRecords.size()); - break; - default: - // Transactional markers are skipped when down converting to V1, so exclude them from size - verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), - 3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers); - - assertEquals(2, batches.size()); - assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); - assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); - - assertEquals(3, logRecords.size()); - assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key()); - assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key()); - assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key()); - break; + List batches = Utils.toList(records.batches().iterator()); + if (compressionType != CompressionType.NONE) { + assertEquals(2, batches.size()); + assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); + assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); + } else { + assertEquals(3, batches.size()); + assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType()); + assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType()); + assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType()); } + + List logRecords = Utils.toList(records.records().iterator()); + assertEquals(3, logRecords.size()); + assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key()); + assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key()); + assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key()); } @Test From 607defc80fedf11a8a33cba33d7893976cef9875 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Thu, 4 Oct 2018 01:17:20 +0900 Subject: [PATCH 15/22] Reimplement down-conversion logic --- .../record/LazyDownConversionRecords.java | 16 +++- .../main/scala/kafka/server/KafkaApis.scala | 81 ++++++++++--------- .../unit/kafka/server/FetchRequestTest.scala | 66 ++++++++++----- 3 files changed, 106 insertions(+), 57 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java index d58689de119a8..cbc0783c5222c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.IntStream; /** * Encapsulation for holding records that require down-conversion in a lazy, chunked manner (KIP-283). See @@ -59,6 +60,10 @@ public LazyDownConversionRecords(TopicPartition topicPartition, Records records, java.util.Iterator it = iterator(0); if (it.hasNext()) { firstConvertedBatch = it.next(); + // KIP-110: ZSTD compressed record batch is not allowed to be down-converted. + if (firstConvertedBatch.records().batchIterator().peek().compressionType() == CompressionType.ZSTD) { + throw new IllegalArgumentException("ZSTD is not allowed to be down-converted."); + } sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes()); } else { // If there are no messages we got after down-conversion, make sure we are able to send at least an overflow @@ -150,7 +155,7 @@ protected ConvertedRecords makeNext() { } while (batchIterator.hasNext()) { - List batches = new ArrayList<>(); + final List batches = new ArrayList<>(); boolean isFirstBatch = true; long sizeSoFar = 0; @@ -162,7 +167,14 @@ protected ConvertedRecords makeNext() { sizeSoFar += currentBatch.sizeInBytes(); isFirstBatch = false; } - ConvertedRecords convertedRecords = RecordsUtil.downConvert(batches, toMagic, firstOffset, time); + + // KIP-110: ZSTD compressed record batch is not allowed to be down-converted. So, down-convert the batches + // only until it is not compressed with ZSTD. + int zStdIndex = IntStream.range(0, batches.size()) + .filter(i -> batches.get(i).compressionType() == CompressionType.ZSTD) + .findFirst().orElse(batches.size()); + + ConvertedRecords convertedRecords = RecordsUtil.downConvert(batches.subList(0, zStdIndex), toMagic, firstOffset, time); // During conversion, it is possible that we drop certain batches because they do not have an equivalent // representation in the message format we want to convert to. For example, V0 and V1 message formats // have no notion of transaction markers which were introduced in V2 so they get dropped during conversion. diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c8576fbbb6234..e14b196850f3a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -540,46 +540,55 @@ class KafkaApis(val requestChannel: RequestChannel, def maybeConvertFetchedData(tp: TopicPartition, partitionData: FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] = { - // Down-conversion of the fetched records is needed when the stored magic version is - // greater than that supported by the client (as indicated by the fetch request version). If the - // configured magic version for the topic is less than or equal to that supported by the version of the - // fetch request, we skip the iteration through the records in order to check the magic version since we - // know it must be supported. However, if the magic version is changed from a higher version back to a - // lower version, this check will no longer be valid and we will fail to down-convert the messages - // which were written in the new format prior to the version downgrade. - val unconvertedRecords = partitionData.records val logConfig = replicaManager.getLogConfig(tp) - val downConvertMagic = - logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic => - if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0)) - Some(RecordBatch.MAGIC_VALUE_V0) - else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) - Some(RecordBatch.MAGIC_VALUE_V1) - else - None - } - // For fetch requests from clients, check if down-conversion is disabled for the particular partition - if (downConvertMagic.isDefined && !fetchRequest.isFromFollower && !logConfig.forall(_.messageDownConversionEnable)) { - trace(s"Conversion to message format ${downConvertMagic.get} is disabled for partition $tp. Sending unsupported version response to $clientId.") - errorResponse(Errors.UNSUPPORTED_VERSION) + if (logConfig.forall(_.compressionType == ZStdCompressionCodec.name) && versionId < 10) { + trace(s"Fetching messages is disabled for Zstd compressed partition $tp. Sending unsupported version response to $clientId.") + errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE) } else { - if (logConfig.forall(_.compressionType == "zstd") && versionId < 10) { - trace(s"Fetching messages is disabled for Zstd compressed partition $tp. Sending unsupported version response to $clientId.") - errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE) - } else { - val convertedRecords = - downConvertMagic.map { magic => - trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId") - // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much - // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked - // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the - // client. - new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time) - }.getOrElse(unconvertedRecords) - new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark, + // Down-conversion of the fetched records is needed when the stored magic version is + // greater than that supported by the client (as indicated by the fetch request version). If the + // configured magic version for the topic is less than or equal to that supported by the version of the + // fetch request, we skip the iteration through the records in order to check the magic version since we + // know it must be supported. However, if the magic version is changed from a higher version back to a + // lower version, this check will no longer be valid and we will fail to down-convert the messages + // which were written in the new format prior to the version downgrade. + val unconvertedRecords = partitionData.records + val downConvertMagic = + logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic => + if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0)) + Some(RecordBatch.MAGIC_VALUE_V0) + else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) + Some(RecordBatch.MAGIC_VALUE_V1) + else + None + } + + downConvertMagic match { + case Some(magic) => + // For fetch requests from clients, check if down-conversion is disabled for the particular partition + if (!fetchRequest.isFromFollower && !logConfig.forall(_.messageDownConversionEnable)) { + trace(s"Conversion to message format ${downConvertMagic.get} is disabled for partition $tp. Sending unsupported version response to $clientId.") + errorResponse(Errors.UNSUPPORTED_VERSION) + } else { + try { + trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId") + // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much + // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked + // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the + // client. + new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark, + FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions, + new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)) + } catch { + case _: IllegalArgumentException => + trace(s"Down-converting Zstd compressed records in partition $tp is disallowed. Sending unsupported version response to $clientId.") + errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE) + } + } + case None => new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions, - convertedRecords) + unconvertedRecords) } } } diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 436c04cc2f106..66cc3cfb322ff 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -22,7 +22,7 @@ import java.util.{Optional, Properties} import kafka.api.KAFKA_0_11_0_IV2 import kafka.log.LogConfig -import kafka.message.{ProducerCompressionCodec, ZStdCompressionCodec} +import kafka.message.{GZIPCompressionCodec, ProducerCompressionCodec, ZStdCompressionCodec} import kafka.utils.TestUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition @@ -465,23 +465,32 @@ class FetchRequestTest extends BaseRequestTest { @Test def testZStdCompressedRecords(): Unit = { // Producer compressed topic - val topicConfig = Map(LogConfig.CompressionTypeProp -> ProducerCompressionCodec.name) + val topicConfig = Map(LogConfig.CompressionTypeProp -> ProducerCompressionCodec.name, + LogConfig.MessageFormatVersionProp -> "2.0.0") val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1, configs = topicConfig).head + // Produce GZIP compressed messages (v2) + val producer1 = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), + compressionType = GZIPCompressionCodec.name, + keySerializer = new StringSerializer, + valueSerializer = new StringSerializer) + producer1.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, + "key1", "value1")).get + producer1.close // Produce ZSTD compressed messages (v2) - producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), + val producer2 = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), compressionType = ZStdCompressionCodec.name, keySerializer = new StringSerializer, valueSerializer = new StringSerializer) - producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, - "key1", "value1")).get - producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, + producer2.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, "key2", "value2")).get - producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, + producer2.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, "key3", "value3")).get - producer.close + producer2.close - // fetch request with fetch version v1: empty response (from magic 2 to 0 down-conversion is disallowed!) + // fetch request with fetch version v1: + // gzip compressed record is returned with down-conversion. + // zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error. val req0 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0, createPartitionMap(300, Seq(topicPartition), Map.empty)) .setMaxBytes(800).build() @@ -489,26 +498,45 @@ class FetchRequestTest extends BaseRequestTest { val res0 = sendFetchRequest(leaderId, req0) val data0 = res0.responseData.get(topicPartition) assertEquals(Errors.NONE, data0.error) - assertEquals(records(data0).size, 0) + assertEquals(1, records(data0).size) - // fetch request with fetch version v3: empty response (from magic 2 to 1 down-conversion is disallowed!) - val req1 = new FetchRequest.Builder(2, 3, -1, Int.MaxValue, 0, - createPartitionMap(300, Seq(topicPartition), Map.empty)) + /* Not working... why? + val req1 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0, + createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L))) .setMaxBytes(800).build() val res1 = sendFetchRequest(leaderId, req1) - val data1 = res0.responseData.get(topicPartition) - assertEquals(Errors.NONE, data1.error) - assertEquals(records(data1).size, 0) + val data1 = res1.responseData.get(topicPartition) + assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data1.error)*/ - // fetch request with version 10: works fine! - val req2= new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0, + // fetch request with fetch version v3: empty response (from magic 2 to 1 down-conversion is disallowed!) + // zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error. + val req2 = new FetchRequest.Builder(2, 3, -1, Int.MaxValue, 0, createPartitionMap(300, Seq(topicPartition), Map.empty)) .setMaxBytes(800).build() + val res2 = sendFetchRequest(leaderId, req2) val data2 = res2.responseData.get(topicPartition) assertEquals(Errors.NONE, data2.error) - assertEquals(records(data2).size, 3) + assertEquals(1, records(data2).size) + + val req3 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0, + createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L))) + .setMaxBytes(800).build() + + /* Not working... why? + val res3 = sendFetchRequest(leaderId, req3) + val data3 = res3.responseData.get(topicPartition) + assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data3.error)*/ + + // fetch request with version 10: works fine! + val req4= new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0, + createPartitionMap(300, Seq(topicPartition), Map.empty)) + .setMaxBytes(800).build() + val res4 = sendFetchRequest(leaderId, req4) + val data4 = res4.responseData.get(topicPartition) + assertEquals(Errors.NONE, data4.error) + assertEquals(3, records(data4).size) } private def records(partitionData: FetchResponse.PartitionData[MemoryRecords]): Seq[Record] = { From f34d3239ec05bc2baa2c0d8c25f0659e8de0a3f1 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 5 Oct 2018 08:25:06 +0900 Subject: [PATCH 16/22] Update for the previous commit (Thanks to Jason Gustafson's guide) --- .../record/LazyDownConversionRecords.java | 16 ++--- .../record/LazyDownConversionRecordsSend.java | 62 ++++++++++++------- .../kafka/common/record/RecordsUtil.java | 11 +++- .../record/MemoryRecordsBuilderTest.java | 5 +- .../main/scala/kafka/server/KafkaApis.scala | 4 +- .../unit/kafka/server/FetchRequestTest.scala | 19 +++--- 6 files changed, 66 insertions(+), 51 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java index cbc0783c5222c..4e844736bbd38 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.stream.IntStream; /** * Encapsulation for holding records that require down-conversion in a lazy, chunked manner (KIP-283). See @@ -45,6 +44,9 @@ public class LazyDownConversionRecords implements BaseRecords { * @param firstOffset The starting offset for down-converted records. This only impacts some cases. See * {@link RecordsUtil#downConvert(Iterable, byte, long, Time)} for an explanation. * @param time The time instance to use + * + * @throws org.apache.kafka.common.errors.UnsupportedCompressionTypeException If the first batch to down-convert + * has a compression type which we do not support down-conversion for. */ public LazyDownConversionRecords(TopicPartition topicPartition, Records records, byte toMagic, long firstOffset, Time time) { this.topicPartition = Objects.requireNonNull(topicPartition); @@ -60,10 +62,6 @@ public LazyDownConversionRecords(TopicPartition topicPartition, Records records, java.util.Iterator it = iterator(0); if (it.hasNext()) { firstConvertedBatch = it.next(); - // KIP-110: ZSTD compressed record batch is not allowed to be down-converted. - if (firstConvertedBatch.records().batchIterator().peek().compressionType() == CompressionType.ZSTD) { - throw new IllegalArgumentException("ZSTD is not allowed to be down-converted."); - } sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes()); } else { // If there are no messages we got after down-conversion, make sure we are able to send at least an overflow @@ -168,13 +166,7 @@ protected ConvertedRecords makeNext() { isFirstBatch = false; } - // KIP-110: ZSTD compressed record batch is not allowed to be down-converted. So, down-convert the batches - // only until it is not compressed with ZSTD. - int zStdIndex = IntStream.range(0, batches.size()) - .filter(i -> batches.get(i).compressionType() == CompressionType.ZSTD) - .findFirst().orElse(batches.size()); - - ConvertedRecords convertedRecords = RecordsUtil.downConvert(batches.subList(0, zStdIndex), toMagic, firstOffset, time); + ConvertedRecords convertedRecords = RecordsUtil.downConvert(batches, toMagic, firstOffset, time); // During conversion, it is possible that we drop certain batches because they do not have an equivalent // representation in the message format we want to convert to. For example, V0 and V1 message formats // have no notion of transaction markers which were introduced in V2 so they get dropped during conversion. diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java index f0fab7d876d92..ad1f97fa0cbaf 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,35 +46,50 @@ public LazyDownConversionRecordsSend(String destination, LazyDownConversionRecor convertedRecordsIterator = records().iterator(MAX_READ_SIZE); } + private MemoryRecords buildOverflowBatch(int remaining) { + // We do not have any records left to down-convert. Construct an overflow message for the length remaining. + // This message will be ignored by the consumer because its length will be past the length of maximum + // possible response size. + // DefaultRecordBatch => + // BaseOffset => Int64 + // Length => Int32 + // ... + ByteBuffer overflowMessageBatch = ByteBuffer.allocate( + Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, MAX_READ_SIZE))); + overflowMessageBatch.putLong(-1L); + + // Fill in the length of the overflow batch. A valid batch must be at least as long as the minimum batch + // overhead. + overflowMessageBatch.putInt(Math.max(remaining + 1, DefaultRecordBatch.RECORD_BATCH_OVERHEAD)); + log.debug("Constructed overflow message batch for partition {} with length={}", topicPartition(), remaining); + return MemoryRecords.readableRecords(overflowMessageBatch); + } + @Override public long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException { if (convertedRecordsWriter == null || convertedRecordsWriter.completed()) { MemoryRecords convertedRecords; - // Check if we have more chunks left to down-convert - if (convertedRecordsIterator.hasNext()) { - // Get next chunk of down-converted messages - ConvertedRecords recordsAndStats = convertedRecordsIterator.next(); - convertedRecords = recordsAndStats.records(); - recordConversionStats.add(recordsAndStats.recordConversionStats()); - log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes()); - } else { - // We do not have any records left to down-convert. Construct an overflow message for the length remaining. - // This message will be ignored by the consumer because its length will be past the length of maximum - // possible response size. - // DefaultRecordBatch => - // BaseOffset => Int64 - // Length => Int32 - // ... - ByteBuffer overflowMessageBatch = ByteBuffer.allocate( - Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, MAX_READ_SIZE))); - overflowMessageBatch.putLong(-1L); - // Fill in the length of the overflow batch. A valid batch must be at least as long as the minimum batch - // overhead. - overflowMessageBatch.putInt(Math.max(remaining + 1, DefaultRecordBatch.RECORD_BATCH_OVERHEAD)); - convertedRecords = MemoryRecords.readableRecords(overflowMessageBatch); - log.debug("Constructed overflow message batch for partition {} with length={}", topicPartition(), remaining); + try { + // Check if we have more chunks left to down-convert + if (convertedRecordsIterator.hasNext()) { + // Get next chunk of down-converted messages + ConvertedRecords recordsAndStats = convertedRecordsIterator.next(); + convertedRecords = recordsAndStats.records(); + recordConversionStats.add(recordsAndStats.recordConversionStats()); + log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes()); + } else { + convertedRecords = buildOverflowBatch(remaining); + } + } catch (UnsupportedCompressionTypeException e) { + // We have encountered a compression type which does not support down-conversion (e.g. zstd). + // Since we have already sent at least one batch and we have committed to the fetch size, we + // send an overflow batch. The consumer will read the first few records and then fetch from the + // offset of the batch which has the unsupported compression type. At that time, we will + // send back the UNSUPPORTED_COMPRESSION_TYPE erro which will allow the consumer to fail gracefully. + convertedRecords = buildOverflowBatch(remaining); } + convertedRecordsWriter = new DefaultRecordsSend(destination(), convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining)); } return convertedRecordsWriter.writeTo(channel); diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java index c9b739413175c..3b0c59a24a521 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; import org.apache.kafka.common.utils.Time; import java.nio.ByteBuffer; @@ -45,8 +46,14 @@ protected static ConvertedRecords downConvert(Iterable - trace(s"Down-converting Zstd compressed records in partition $tp is disallowed. Sending unsupported version response to $clientId.") + case e: UnsupportedCompressionTypeException => + trace("Received unsupported compression type error during down-conversion", e) errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE) } } diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 66cc3cfb322ff..72a28549cf584 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -441,7 +441,7 @@ class FetchRequestTest extends BaseRequestTest { "key2", "value2")).get producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, "key3", "value3")).get - producer.close + producer.close() // fetch request with version below v10: UNSUPPORTED_COMPRESSION_TYPE error occurs val req0 = new FetchRequest.Builder(0, 9, -1, Int.MaxValue, 0, @@ -459,7 +459,7 @@ class FetchRequestTest extends BaseRequestTest { val res1 = sendFetchRequest(leaderId, req1) val data1 = res1.responseData.get(topicPartition) assertEquals(Errors.NONE, data1.error) - assertEquals(records(data1).size, 3) + assertEquals(3, records(data1).size) } @Test @@ -476,7 +476,7 @@ class FetchRequestTest extends BaseRequestTest { valueSerializer = new StringSerializer) producer1.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, "key1", "value1")).get - producer1.close + producer1.close() // Produce ZSTD compressed messages (v2) val producer2 = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), compressionType = ZStdCompressionCodec.name, @@ -486,9 +486,9 @@ class FetchRequestTest extends BaseRequestTest { "key2", "value2")).get producer2.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, "key3", "value3")).get - producer2.close + producer2.close() - // fetch request with fetch version v1: + // fetch request with fetch version v1 (magic 0): // gzip compressed record is returned with down-conversion. // zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error. val req0 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0, @@ -500,16 +500,16 @@ class FetchRequestTest extends BaseRequestTest { assertEquals(Errors.NONE, data0.error) assertEquals(1, records(data0).size) - /* Not working... why? val req1 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0, createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L))) .setMaxBytes(800).build() val res1 = sendFetchRequest(leaderId, req1) val data1 = res1.responseData.get(topicPartition) - assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data1.error)*/ + assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data1.error) - // fetch request with fetch version v3: empty response (from magic 2 to 1 down-conversion is disallowed!) + // fetch request with fetch version v3 (magic 1): + // gzip compressed record is returned with down-conversion. // zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error. val req2 = new FetchRequest.Builder(2, 3, -1, Int.MaxValue, 0, createPartitionMap(300, Seq(topicPartition), Map.empty)) @@ -524,10 +524,9 @@ class FetchRequestTest extends BaseRequestTest { createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L))) .setMaxBytes(800).build() - /* Not working... why? val res3 = sendFetchRequest(leaderId, req3) val data3 = res3.responseData.get(topicPartition) - assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data3.error)*/ + assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data3.error) // fetch request with version 10: works fine! val req4= new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0, From 1eee287a574a1c2babbbad8b2b1b5b263a2e5066 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 6 Oct 2018 00:33:43 +0900 Subject: [PATCH 17/22] Add UNSUPPORTED_COMPRESSION_TYPE to the possible error codes list of FetchResponse --- .../java/org/apache/kafka/common/requests/FetchResponse.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index cf72664522244..9c29d375ccba0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -61,6 +61,8 @@ * - {@link Errors#UNKNOWN_LEADER_EPOCH} If the epoch is larger than the broker's epoch * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} If the broker does not have metadata for a topic or partition * - {@link Errors#KAFKA_STORAGE_ERROR} If the log directory for one of the requested partitions is offline + * - {@link Errors#UNSUPPORTED_COMPRESSION_TYPE} If a fetched topic is using a compression type which is + * not supported by the fetch request version * - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors */ public class FetchResponse extends AbstractResponse { From bf959577ccf3d832e902fc3fb8a3bc69b4ce01f7 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 6 Oct 2018 01:13:29 +0900 Subject: [PATCH 18/22] Refactor tests: MemoryRecordsTest, MemoryRecordsBuilderTest --- .../record/MemoryRecordsBuilderTest.java | 67 ++++++------------- .../common/record/MemoryRecordsTest.java | 52 +++++--------- 2 files changed, 35 insertions(+), 84 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 84c668283d423..552b4e6c42b03 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Random; +import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2; import static org.apache.kafka.common.utils.Utils.utf8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -57,10 +58,7 @@ public MemoryRecordsBuilderTest(int bufferOffset, CompressionType compressionTyp @Test public void testWriteEmptyRecordSet() { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); @@ -217,10 +215,7 @@ public void testWriteEndTxnMarkerNonControlBatch() { @Test public void testCompressionRateV0() { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); @@ -277,10 +272,7 @@ public void testEstimatedSizeInBytes() { @Test public void testCompressionRateV1() { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 1"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); @@ -313,10 +305,7 @@ public void testCompressionRateV1() { @Test public void buildUsingLogAppendTime() { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 1"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); @@ -347,10 +336,7 @@ public void buildUsingLogAppendTime() { @Test public void buildUsingCreateTime() { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 1"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); @@ -383,10 +369,7 @@ public void buildUsingCreateTime() { @Test public void testAppendedChecksumConsistency() { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); ByteBuffer buffer = ByteBuffer.allocate(512); for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { @@ -432,10 +415,7 @@ public void testSmallWriteLimit() { @Test public void writePastLimit() { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 1"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1); ByteBuffer buffer = ByteBuffer.allocate(64); buffer.position(bufferOffset); @@ -542,10 +522,7 @@ public void convertV2ToV1UsingMixedCreateAndLogAppendTime() { @Test public void convertToV1WithMixedV0AndV2Data() { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); ByteBuffer buffer = ByteBuffer.allocate(512); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, @@ -621,10 +598,7 @@ public void convertToV1WithMixedV0AndV2Data() { @Test public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exception { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); @@ -643,10 +617,7 @@ public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exceptio @Test public void shouldResetBufferToInitialPositionOnAbort() throws Exception { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); @@ -661,10 +632,7 @@ public void shouldResetBufferToInitialPositionOnAbort() throws Exception { @Test public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() throws Exception { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); @@ -683,10 +651,7 @@ public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() throws Exceptio @Test public void shouldThrowIllegalStateExceptionOnAppendWhenAborted() throws Exception { - if (compressionType == CompressionType.ZSTD) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic 0"); - } + expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0); ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); @@ -769,4 +734,10 @@ else if (numRecordsConverted == numRecords) } } + private void expectExceptionWithZStd(CompressionType compressionType, byte magic) { + if (compressionType == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index aaafeeee544b4..5f16acfb9e3c4 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -74,10 +74,7 @@ public MemoryRecordsTest(byte magic, long firstOffset, CompressionType compressi @Test public void testIterator() { - if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); - } + expectExceptionWithZStd(compression, magic); ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -162,10 +159,7 @@ public void testIterator() { @Test public void testHasRoomForMethod() { - if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); - } + expectExceptionWithZStd(compression, magic); MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, 0L); @@ -454,10 +448,7 @@ public void testBuildEndTxnMarker() { @Test public void testFilterToBatchDiscard() { if (compression != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) { - if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); - } + expectExceptionWithZStd(compression, magic); ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); @@ -509,10 +500,7 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { @Test public void testFilterToAlreadyCompactedLog() { - if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); - } + expectExceptionWithZStd(compression, magic); ByteBuffer buffer = ByteBuffer.allocate(2048); @@ -654,10 +642,7 @@ public void testFilterToPreservesProducerInfo() { @Test public void testFilterToWithUndersizedBuffer() { - if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); - } + expectExceptionWithZStd(compression, magic); ByteBuffer buffer = ByteBuffer.allocate(1024); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); @@ -709,10 +694,7 @@ public void testFilterToWithUndersizedBuffer() { @Test public void testToString() { - if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); - } + expectExceptionWithZStd(compression, magic); long timestamp = 1000000; MemoryRecords memoryRecords = MemoryRecords.withRecords(magic, compression, @@ -744,10 +726,7 @@ public void testToString() { @Test public void testFilterTo() { - if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); - } + expectExceptionWithZStd(compression, magic); ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); @@ -862,10 +841,7 @@ public void testFilterTo() { @Test public void testFilterToPreservesLogAppendTime() { - if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); - } + expectExceptionWithZStd(compression, magic); long logAppendTime = System.currentTimeMillis(); @@ -911,10 +887,7 @@ public void testFilterToPreservesLogAppendTime() { @Test public void testNextBatchSize() { - if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); - } + expectExceptionWithZStd(compression, magic); ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, @@ -955,6 +928,13 @@ public void testNextBatchSize() { } } + private void expectExceptionWithZStd(CompressionType compressionType, byte magic) { + if (compressionType == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) { + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic); + } + } + @Parameterized.Parameters(name = "{index} magic={0}, firstOffset={1}, compressionType={2}") public static Collection data() { List values = new ArrayList<>(); From 36d77afe44c800376c5a2fa03746a023cefa1fa1 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 6 Oct 2018 01:48:10 +0900 Subject: [PATCH 19/22] Remove ProduceRequest version check in KafkaApis#handleProduceRequest method --- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +----- .../test/scala/unit/kafka/server/ProduceRequestTest.scala | 5 ----- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c2ed7b226220a..77fa2a37ee45b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -375,7 +375,6 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle a produce request */ def handleProduceRequest(request: RequestChannel.Request) { - val versionId = request.header.apiVersion val produceRequest = request.body[ProduceRequest] val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes @@ -393,7 +392,6 @@ class KafkaApis(val requestChannel: RequestChannel, val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() - val erroneousTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) { @@ -401,15 +399,13 @@ class KafkaApis(val requestChannel: RequestChannel, unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicPartition)) nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) - else if (replicaManager.getLogConfig(topicPartition).forall(_.compressionType == "zstd") && versionId < 7) - erroneousTopicResponses += (topicPartition -> new PartitionResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)) else authorizedRequestInfo += (topicPartition -> memoryRecords) } // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { - val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ erroneousTopicResponses + val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses var errorInResponse = false mergedResponseStatus.foreach { case (topicPartition, status) => diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 3da6ecb11999a..b1f3af145b958 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -130,11 +130,6 @@ class ProduceRequestTest extends BaseRequestTest { val topicPartition = new TopicPartition("topic", partition) val partitionRecords = Map(topicPartition -> memoryRecords) - // produce request with version below v7: UNSUPPORTED_COMPRESSION_TYPE error occurs - val res0 = sendProduceRequest(leader, - new ProduceRequest.Builder(3, 6, -1, 3000, partitionRecords.asJava, null).build()) - assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, res0.responses.asScala.head._2.error) - // produce request with v7: works fine! val res1 = sendProduceRequest(leader, new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava, null).build()) From 54eaa6122b7ccbcf3532b9a89be43e44d1755f4e Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 6 Oct 2018 02:02:28 +0900 Subject: [PATCH 20/22] Add validation for the case magic < 2 with Zstandard Compression --- .../record/AbstractLegacyRecordBatch.java | 2 + .../record/AbstractLegacyRecordBatchTest.java | 38 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 6ac073dd29801..cdf731c3bf2d0 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -322,6 +322,8 @@ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic); CompressionType compressionType = wrapperRecord.compressionType(); + if (compressionType == CompressionType.ZSTD) + throw new InvalidRecordException("Invalid wrapper compressionType found in legacy deep record iterator " + wrapperMagic); ByteBuffer wrapperValue = wrapperRecord.value(); if (wrapperValue == null) throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " + diff --git a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java index 83ada730ce24f..fe6ffabaf61eb 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class AbstractLegacyRecordBatchTest { @@ -208,4 +209,41 @@ public void testSetCreateTimeV1() { assertEquals(expectedTimestamp++, record.timestamp()); } + @Test + public void testZStdCompressionTypeWithV0OrV1() { + SimpleRecord[] simpleRecords = new SimpleRecord[] { + new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), + new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), + new SimpleRecord(3L, "c".getBytes(), "3".getBytes()) + }; + + // Check V0 + try { + MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V0, 0L, + CompressionType.ZSTD, TimestampType.CREATE_TIME, simpleRecords); + + ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer()); + batch.setLastOffset(1L); + + batch.iterator(); + fail("Can't reach here"); + } catch (IllegalArgumentException e) { + assertEquals("ZStandard compression is not supported for magic 0", e.getMessage()); + } + + // Check V1 + try { + MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L, + CompressionType.ZSTD, TimestampType.CREATE_TIME, simpleRecords); + + ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer()); + batch.setLastOffset(1L); + + batch.iterator(); + fail("Can't reach here"); + } catch (IllegalArgumentException e) { + assertEquals("ZStandard compression is not supported for magic 1", e.getMessage()); + } + } + } From 65c57a1b2024f1a1305295252d1742c68a4ea2c4 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 6 Oct 2018 02:06:56 +0900 Subject: [PATCH 21/22] Consistency: all log or exception messages are using 'ZStandard', not 'Zstd' --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 77fa2a37ee45b..ecbbdb6f03fa1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -539,7 +539,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logConfig = replicaManager.getLogConfig(tp) if (logConfig.forall(_.compressionType == ZStdCompressionCodec.name) && versionId < 10) { - trace(s"Fetching messages is disabled for Zstd compressed partition $tp. Sending unsupported version response to $clientId.") + trace(s"Fetching messages is disabled for ZStandard compressed partition $tp. Sending unsupported version response to $clientId.") errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE) } else { // Down-conversion of the fetched records is needed when the stored magic version is From 147c47358b3d9483de633b855ff809f64b9befef Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 6 Oct 2018 02:33:50 +0900 Subject: [PATCH 22/22] Add validation for ProduceRequest: update ProduceRequest.validateRecords --- .../kafka/common/requests/ProduceRequest.java | 5 +++++ .../common/requests/ProduceRequestTest.java | 20 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index a03ad582cc726..f87090eba6a1f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MutableRecordBatch; @@ -251,6 +252,10 @@ private void validateRecords(short version, MemoryRecords records) { if (entry.magic() != RecordBatch.MAGIC_VALUE_V2) throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + "contain record batches with magic version 2"); + if (version < 7 && entry.compressionType() == CompressionType.ZSTD) { + throw new InvalidRecordException("Produce requests with version " + version + " are note allowed to " + + "use ZStandard compression"); + } if (iterator.hasNext()) throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java index ef17c96c5ad01..74e3960dd47a2 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java @@ -158,6 +158,26 @@ public void testV3AndAboveCannotUseMagicV1() { assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); } + @Test + public void testV6AndBelowCannotUseZStdCompression() { + ByteBuffer buffer = ByteBuffer.allocate(256); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.ZSTD, + TimestampType.CREATE_TIME, 0L); + builder.append(10L, null, "a".getBytes()); + + Map produceData = new HashMap<>(); + produceData.put(new TopicPartition("test", 0), builder.build()); + + // Can't create ProduceRequest instance with version within [3, 7) + for (short version = 3; version < 7; version++) { + ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, (short) 1, 5000, produceData, null); + assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); + } + + // Works fine with current version (>= 7) + ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData); + } + private void assertThrowsInvalidRecordExceptionForAllVersions(ProduceRequest.Builder builder) { for (short version = builder.oldestAllowedVersion(); version < builder.latestAllowedVersion(); version++) { assertThrowsInvalidRecordException(builder, version);