Skip to content

Commit

Permalink
[improve][java-client]Shrink BatchMessageContainer maxBatchSize (#17854)
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy authored Oct 2, 2022
1 parent e26060a commit 9ff9703
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public long getCurrentBatchSize() {
return currentBatchSizeBytes;
}

int getMaxBatchSize() {
return maxBatchSize;
}

@Override
public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
protected SendCallback firstCallback;

private final ByteBufAllocator allocator;
private static final int SHRINK_COOLING_OFF_PERIOD = 10;
private int consecutiveShrinkTime = 0;

public BatchMessageContainerImpl() {
this(PulsarByteBufAllocator.DEFAULT);
Expand Down Expand Up @@ -98,7 +100,8 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
messageMetadata.setSequenceId(msg.getSequenceId());
lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
this.firstCallback = callback;
batchedMessageMetadataAndPayload = allocator.compositeBuffer();
batchedMessageMetadataAndPayload = allocator.buffer(
Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
}
Expand Down Expand Up @@ -167,11 +170,30 @@ private ByteBuf getCompressedBatchMetadataAndPayload() {

// Update the current max batch size using the uncompressed size, which is what we need in any case to
// accumulate the batch content
maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
updateMaxBatchSize(uncompressedSize);
maxMessagesNum = Math.max(maxMessagesNum, numMessagesInBatch);
return compressedPayload;
}

void updateMaxBatchSize(int uncompressedSize) {
if (uncompressedSize > maxBatchSize) {
maxBatchSize = uncompressedSize;
consecutiveShrinkTime = 0;
} else {
int shrank = maxBatchSize - (maxBatchSize >> 2);
if (uncompressedSize <= shrank) {
if (consecutiveShrinkTime <= SHRINK_COOLING_OFF_PERIOD) {
consecutiveShrinkTime++;
} else {
maxBatchSize = shrank;
consecutiveShrinkTime = 0;
}
} else {
consecutiveShrinkTime = 0;
}
}
}

@Override
public void clear() {
messages = new ArrayList<>(maxMessagesNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -40,6 +41,63 @@

public class BatchMessageContainerImplTest {

@Test
public void testUpdateMaxBatchSize() {
int SHRINK_COOLING_OFF_PERIOD = 10;
BatchMessageContainerImpl messageContainer = new BatchMessageContainerImpl();
// check init state
assertEquals(messageContainer.getMaxBatchSize(), 1024);

// test expand
messageContainer.updateMaxBatchSize(2048);
assertEquals(messageContainer.getMaxBatchSize(), 2048);

// test cooling-off period
messageContainer.updateMaxBatchSize(2);
assertEquals(messageContainer.getMaxBatchSize(), 2048);

// test shrink
for (int i = 0; i < 15; ++i) {
messageContainer.updateMaxBatchSize(2);
if (i < SHRINK_COOLING_OFF_PERIOD) {
assertEquals(messageContainer.getMaxBatchSize(), 2048);
} else {
assertEquals(messageContainer.getMaxBatchSize(), 2048 * 0.75);
}
}

messageContainer.updateMaxBatchSize(2048);
// test big message sudden appearance
for (int i = 0; i < 15; ++i) {
if (i == SHRINK_COOLING_OFF_PERIOD - 2) {
messageContainer.updateMaxBatchSize(2000);
} else {
messageContainer.updateMaxBatchSize(2);
}
assertEquals(messageContainer.getMaxBatchSize(), 2048);
}

// test big and small message alternating occurrence
for (int i = 0; i < SHRINK_COOLING_OFF_PERIOD * 3; ++i) {
if (i % 2 ==0) {
messageContainer.updateMaxBatchSize(2);
} else {
messageContainer.updateMaxBatchSize(2000);
}
assertEquals(messageContainer.getMaxBatchSize(), 2048);
}

// test consecutive big message
for (int i = 0; i < 15; ++i) {
messageContainer.updateMaxBatchSize(2000);
assertEquals(messageContainer.getMaxBatchSize(), 2048);
}

// test expand after shrink
messageContainer.updateMaxBatchSize(4096);
assertEquals(messageContainer.getMaxBatchSize(), 4096);
}

@Test
public void recoveryAfterOom() {
final AtomicBoolean called = new AtomicBoolean();
Expand All @@ -62,7 +120,7 @@ public void recoveryAfterOom() {
doAnswer((ignore) -> {
called.set(true);
throw new OutOfMemoryError("test");
}).when(mockAllocator).compositeBuffer();
}).when(mockAllocator).buffer(anyInt());
final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
batchMessageContainer.setProducer(producer);
MessageMetadata messageMetadata1 = new MessageMetadata();
Expand Down

0 comments on commit 9ff9703

Please sign in to comment.