Skip to content

Commit

Permalink
[fix][common] Fix parsing partitionedKey with Base64 encode issue. (a…
Browse files Browse the repository at this point in the history
…pache#17687)

* Fix parsing partitionedKey with Base64 encode issue.

* release the buf

* fix checkstyle issue.
  • Loading branch information
Technoboy- authored Sep 16, 2022
1 parent af19f0e commit f3cc107
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1900,6 +1901,9 @@ public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, Str
if (metadata.hasOrderingKey()) {
return metadata.getOrderingKey();
} else if (metadata.hasPartitionKey()) {
if (metadata.isPartitionKeyB64Encoded()) {
return Base64.getDecoder().decode(metadata.getPartitionKey());
}
return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@
*/
package org.apache.pulsar.common.compression;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.scurrilous.circe.checksum.Crc32cIntChecksum;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;

import java.io.IOException;

import java.util.Base64;
import io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.testng.Assert;
import org.testng.annotations.Test;

public class CommandsTest {
Expand Down Expand Up @@ -93,5 +94,35 @@ private int computeChecksum(MessageMetadata msgMetadata, ByteBuf compressedPaylo
return computedChecksum;
}


@Test
public void testPeekStickyKey() {
String message = "msg-1";
String partitionedKey = "key1";
MessageMetadata messageMetadata2 = new MessageMetadata()
.setSequenceId(1)
.setProducerName("testProducer")
.setPartitionKey(partitionedKey)
.setPartitionKeyB64Encoded(false)
.setPublishTime(System.currentTimeMillis());
ByteBuf byteBuf = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata2,
Unpooled.copiedBuffer(message.getBytes(UTF_8)));
byte[] bytes = Commands.peekStickyKey(byteBuf, "topic-1", "sub-1");
String key = new String(bytes);
Assert.assertEquals(partitionedKey, key);
ReferenceCountUtil.safeRelease(byteBuf);
// test 64 encoded
String partitionedKey2 = Base64.getEncoder().encodeToString("key2".getBytes(UTF_8));
MessageMetadata messageMetadata = new MessageMetadata()
.setSequenceId(1)
.setProducerName("testProducer")
.setPartitionKey(partitionedKey2)
.setPartitionKeyB64Encoded(true)
.setPublishTime(System.currentTimeMillis());
ByteBuf byteBuf2 = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
Unpooled.copiedBuffer(message.getBytes(UTF_8)));
byte[] bytes2 = Commands.peekStickyKey(byteBuf2, "topic-2", "sub-2");
String key2 = Base64.getEncoder().encodeToString(bytes2);;
Assert.assertEquals(partitionedKey2, key2);
ReferenceCountUtil.safeRelease(byteBuf2);
}
}

0 comments on commit f3cc107

Please sign in to comment.