Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][java-client]Add init capacity for messages in BatchMessageContainerImpl #17822

Merged
merged 5 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -46,10 +47,12 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
protected long currentTxnidLeastBits = -1L;

protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
protected static final int INITIAL_MESSAGES_NUM = 32;

// This will be the largest size for a batch sent from this particular producer. This is used as a baseline to
// allocate a new buffer that can hold the entire batch without needing costly reallocations
protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;
protected int maxMessagesNum = INITIAL_MESSAGES_NUM;

@Override
public boolean haveEnoughSpace(MessageImpl<?> msg) {
Expand All @@ -71,6 +74,11 @@ public int getNumMessagesInBatch() {
return numMessagesInBatch;
}

@VisibleForTesting
public int getMaxMessagesNum() {
return maxMessagesNum;
}

@Override
public long getCurrentBatchSize() {
return currentBatchSizeBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
@Setter
private long highestSequenceId = -1L;
private ByteBuf batchedMessageMetadataAndPayload;
private List<MessageImpl<?>> messages = new ArrayList<>();
private List<MessageImpl<?>> messages = new ArrayList<>(maxMessagesNum);
protected SendCallback previousCallback = null;
// keep track of callbacks for individual messages being published in a batch
protected SendCallback firstCallback;
Expand Down Expand Up @@ -168,12 +168,13 @@ private ByteBuf getCompressedBatchMetadataAndPayload() {
// Update the current max batch size using the uncompressed size, which is what we need in any case to
// accumulate the batch content
maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
maxMessagesNum = Math.max(maxMessagesNum, numMessagesInBatch);
return compressedPayload;
}

@Override
public void clear() {
messages = new ArrayList<>();
messages = new ArrayList<>(maxMessagesNum);
Jason918 marked this conversation as resolved.
Show resolved Hide resolved
firstCallback = null;
previousCallback = null;
messageMetadata.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.pulsar.client.impl;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -78,4 +82,59 @@ public void recoveryAfterOom() {
// after oom, our add can self-healing, won't throw exception
batchMessageContainer.add(message2, null);
}

@Test
public void testMessagesSize() throws Exception {
ProducerImpl producer = mock(ProducerImpl.class);

final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
producerConfigurationData.setCompressionType(CompressionType.NONE);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
MemoryLimitController memoryLimitController = mock(MemoryLimitController.class);
when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
try {
Field clientFiled = HandlerState.class.getDeclaredField("client");
clientFiled.setAccessible(true);
clientFiled.set(producer, pulsarClient);
} catch (Exception e){
Assert.fail(e.getMessage());
}

ByteBuffer payload = ByteBuffer.wrap("payload".getBytes(StandardCharsets.UTF_8));

when(producer.getConfiguration()).thenReturn(producerConfigurationData);
when(producer.encryptMessage(any(), any())).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(payload));

final int initNum = 32;
BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(producer);
assertEquals(batchMessageContainer.getMaxMessagesNum(), initNum);

addMessagesAndCreateOpSendMsg(batchMessageContainer, 10);
assertEquals(batchMessageContainer.getMaxMessagesNum(), initNum);

addMessagesAndCreateOpSendMsg(batchMessageContainer, 200);
assertEquals(batchMessageContainer.getMaxMessagesNum(), 200);

addMessagesAndCreateOpSendMsg(batchMessageContainer, 10);
assertEquals(batchMessageContainer.getMaxMessagesNum(), 200);
}

private void addMessagesAndCreateOpSendMsg(BatchMessageContainerImpl batchMessageContainer, int num)
throws Exception{
ArrayList<MessageImpl<?>> messages = new ArrayList<>();
for (int i = 0; i < num; ++i) {
MessageMetadata messageMetadata = new MessageMetadata();
messageMetadata.setSequenceId(i);
messageMetadata.setProducerName("producer");
messageMetadata.setPublishTime(System.currentTimeMillis());
ByteBuffer payload = ByteBuffer.wrap("payload".getBytes(StandardCharsets.UTF_8));
MessageImpl<?> message = MessageImpl.create(messageMetadata, payload, Schema.BYTES, null);
messages.add(message);
batchMessageContainer.add(message, null);
}

batchMessageContainer.createOpSendMsg();
batchMessageContainer.clear();
messages.forEach(ReferenceCountUtil::safeRelease);
}
}