Skip to content

Commit

Permalink
[improve][java-client]Add init capacity for messages in BatchMessageC…
Browse files Browse the repository at this point in the history
…ontainerImpl (apache#17822)
  • Loading branch information
AnonHxy authored Sep 30, 2022
1 parent 5e8902c commit 15a347c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -46,10 +47,12 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
protected long currentTxnidLeastBits = -1L;

protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
protected static final int INITIAL_MESSAGES_NUM = 32;

// This will be the largest size for a batch sent from this particular producer. This is used as a baseline to
// allocate a new buffer that can hold the entire batch without needing costly reallocations
protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;
protected int maxMessagesNum = INITIAL_MESSAGES_NUM;

@Override
public boolean haveEnoughSpace(MessageImpl<?> msg) {
Expand All @@ -71,6 +74,11 @@ public int getNumMessagesInBatch() {
return numMessagesInBatch;
}

@VisibleForTesting
public int getMaxMessagesNum() {
return maxMessagesNum;
}

@Override
public long getCurrentBatchSize() {
return currentBatchSizeBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
@Setter
private long highestSequenceId = -1L;
private ByteBuf batchedMessageMetadataAndPayload;
private List<MessageImpl<?>> messages = new ArrayList<>();
private List<MessageImpl<?>> messages = new ArrayList<>(maxMessagesNum);
protected SendCallback previousCallback = null;
// keep track of callbacks for individual messages being published in a batch
protected SendCallback firstCallback;
Expand Down Expand Up @@ -168,12 +168,13 @@ private ByteBuf getCompressedBatchMetadataAndPayload() {
// Update the current max batch size using the uncompressed size, which is what we need in any case to
// accumulate the batch content
maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
maxMessagesNum = Math.max(maxMessagesNum, numMessagesInBatch);
return compressedPayload;
}

@Override
public void clear() {
messages = new ArrayList<>();
messages = new ArrayList<>(maxMessagesNum);
firstCallback = null;
previousCallback = null;
messageMetadata.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.pulsar.client.impl;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -78,4 +82,59 @@ public void recoveryAfterOom() {
// after oom, our add can self-healing, won't throw exception
batchMessageContainer.add(message2, null);
}

@Test
public void testMessagesSize() throws Exception {
ProducerImpl producer = mock(ProducerImpl.class);

final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
producerConfigurationData.setCompressionType(CompressionType.NONE);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
MemoryLimitController memoryLimitController = mock(MemoryLimitController.class);
when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
try {
Field clientFiled = HandlerState.class.getDeclaredField("client");
clientFiled.setAccessible(true);
clientFiled.set(producer, pulsarClient);
} catch (Exception e){
Assert.fail(e.getMessage());
}

ByteBuffer payload = ByteBuffer.wrap("payload".getBytes(StandardCharsets.UTF_8));

when(producer.getConfiguration()).thenReturn(producerConfigurationData);
when(producer.encryptMessage(any(), any())).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(payload));

final int initNum = 32;
BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(producer);
assertEquals(batchMessageContainer.getMaxMessagesNum(), initNum);

addMessagesAndCreateOpSendMsg(batchMessageContainer, 10);
assertEquals(batchMessageContainer.getMaxMessagesNum(), initNum);

addMessagesAndCreateOpSendMsg(batchMessageContainer, 200);
assertEquals(batchMessageContainer.getMaxMessagesNum(), 200);

addMessagesAndCreateOpSendMsg(batchMessageContainer, 10);
assertEquals(batchMessageContainer.getMaxMessagesNum(), 200);
}

private void addMessagesAndCreateOpSendMsg(BatchMessageContainerImpl batchMessageContainer, int num)
throws Exception{
ArrayList<MessageImpl<?>> messages = new ArrayList<>();
for (int i = 0; i < num; ++i) {
MessageMetadata messageMetadata = new MessageMetadata();
messageMetadata.setSequenceId(i);
messageMetadata.setProducerName("producer");
messageMetadata.setPublishTime(System.currentTimeMillis());
ByteBuffer payload = ByteBuffer.wrap("payload".getBytes(StandardCharsets.UTF_8));
MessageImpl<?> message = MessageImpl.create(messageMetadata, payload, Schema.BYTES, null);
messages.add(message);
batchMessageContainer.add(message, null);
}

batchMessageContainer.createOpSendMsg();
batchMessageContainer.clear();
messages.forEach(ReferenceCountUtil::safeRelease);
}
}

0 comments on commit 15a347c

Please sign in to comment.