From 166a7ab5e1082646aebee4f152d968e3c0d102f7 Mon Sep 17 00:00:00 2001 From: Xiaoyu Hou Date: Fri, 30 Sep 2022 10:33:37 +0800 Subject: [PATCH] [improve][java-client]Add init capacity for messages in BatchMessageContainerImpl (#17822) (cherry picked from commit 15a347ca999befe3ea3bd246d34309ad50fbcbe2) --- .../impl/AbstractBatchMessageContainer.java | 8 +++ .../impl/BatchMessageContainerImpl.java | 5 +- .../impl/BatchMessageContainerImplTest.java | 64 +++++++++++++++++++ 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java index 73f1e6d088906..9b4d1b7d683dd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -46,10 +47,12 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta protected long currentTxnidLeastBits = -1L; protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024; + protected static final int INITIAL_MESSAGES_NUM = 32; // This will be the largest size for a batch sent from this particular producer. This is used as a baseline to // allocate a new buffer that can hold the entire batch without needing costly reallocations protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE; + protected int maxMessagesNum = INITIAL_MESSAGES_NUM; @Override public boolean haveEnoughSpace(MessageImpl msg) { @@ -71,6 +74,11 @@ public int getNumMessagesInBatch() { return numMessagesInBatch; } + @VisibleForTesting + public int getMaxMessagesNum() { + return maxMessagesNum; + } + @Override public long getCurrentBatchSize() { return currentBatchSizeBytes; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 0a6fedf0ab605..cfc0e1f98c139 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -49,7 +49,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { private long lowestSequenceId = -1L; private long highestSequenceId = -1L; private ByteBuf batchedMessageMetadataAndPayload; - private List> messages = new ArrayList<>(); + private List> messages = new ArrayList<>(maxMessagesNum); protected SendCallback previousCallback = null; // keep track of callbacks for individual messages being published in a batch protected SendCallback firstCallback; @@ -139,12 +139,13 @@ private ByteBuf getCompressedBatchMetadataAndPayload() { // Update the current max batch size using the uncompressed size, which is what we need in any case to // accumulate the batch content maxBatchSize = Math.max(maxBatchSize, uncompressedSize); + maxMessagesNum = Math.max(maxMessagesNum, numMessagesInBatch); return compressedPayload; } @Override public void clear() { - messages = new ArrayList<>(); + messages = new ArrayList<>(maxMessagesNum); firstCallback = null; previousCallback = null; messageMetadata.clear(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java index 8fc018b3199d5..69aaf95385b70 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java @@ -18,13 +18,20 @@ */ package org.apache.pulsar.client.impl; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.ReferenceCountUtil; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl; import org.apache.pulsar.client.api.CompressionType; @@ -35,6 +42,7 @@ import org.mockito.MockedConstruction; import org.mockito.Mockito; import org.powermock.reflect.Whitebox; +import org.testng.Assert; import org.testng.annotations.Test; public class BatchMessageContainerImplTest { @@ -84,4 +92,60 @@ private void replaceByteBufAllocator() throws NoSuchMethodException, IllegalAcce createByteBufAllocatorMethod.invoke(null)); } + + @Test + public void testMessagesSize() throws Exception { + ProducerImpl producer = mock(ProducerImpl.class); + + final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData(); + producerConfigurationData.setCompressionType(CompressionType.NONE); + PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + MemoryLimitController memoryLimitController = mock(MemoryLimitController.class); + when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController); + try { + Field clientFiled = HandlerState.class.getDeclaredField("client"); + clientFiled.setAccessible(true); + clientFiled.set(producer, pulsarClient); + } catch (Exception e){ + Assert.fail(e.getMessage()); + } + + ByteBuffer payload = ByteBuffer.wrap("payload".getBytes(StandardCharsets.UTF_8)); + + when(producer.getConfiguration()).thenReturn(producerConfigurationData); + when(producer.encryptMessage(any(), any())).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(payload)); + + final int initNum = 32; + BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(); + batchMessageContainer.setProducer(producer); + assertEquals(batchMessageContainer.getMaxMessagesNum(), initNum); + + addMessagesAndCreateOpSendMsg(batchMessageContainer, 10); + assertEquals(batchMessageContainer.getMaxMessagesNum(), initNum); + + addMessagesAndCreateOpSendMsg(batchMessageContainer, 200); + assertEquals(batchMessageContainer.getMaxMessagesNum(), 200); + + addMessagesAndCreateOpSendMsg(batchMessageContainer, 10); + assertEquals(batchMessageContainer.getMaxMessagesNum(), 200); + } + + private void addMessagesAndCreateOpSendMsg(BatchMessageContainerImpl batchMessageContainer, int num) + throws Exception{ + ArrayList> messages = new ArrayList<>(); + for (int i = 0; i < num; ++i) { + MessageMetadata messageMetadata = new MessageMetadata(); + messageMetadata.setSequenceId(i); + messageMetadata.setProducerName("producer"); + messageMetadata.setPublishTime(System.currentTimeMillis()); + ByteBuffer payload = ByteBuffer.wrap("payload".getBytes(StandardCharsets.UTF_8)); + MessageImpl message = MessageImpl.create(messageMetadata, payload, Schema.BYTES, null); + messages.add(message); + batchMessageContainer.add(message, null); + } + + batchMessageContainer.createOpSendMsg(); + batchMessageContainer.clear(); + messages.forEach(ReferenceCountUtil::safeRelease); + } }