Skip to content

Commit

Permalink
[fix][cli] Quit PerformanceConsumer after receiving numMessages messa…
Browse files Browse the repository at this point in the history
…ges (#17750)

(cherry picked from commit 59ce90c)
  • Loading branch information
Andras Beni authored and congbobo184 committed Dec 7, 2022
1 parent 85e65aa commit 0c1aab8
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,12 @@ public static void main(String[] args) throws Exception {
totalMessagesReceived.increment();
totalBytesReceived.add(msg.size());

if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= arguments.numMessages) {
log.info("------------------- DONE -----------------------");
PerfClientUtils.exit(0);
thread.interrupt();
}

if (limiter != null) {
limiter.acquire();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.cli;

import static org.testng.Assert.fail;
import org.apache.pulsar.tests.integration.containers.ChaosContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class PerfToolTest extends TopicMessagingBase {

private static final int MESSAGE_COUNT = 50;

@Test
private void testProduce() throws Exception {
String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
final String topicName = getNonPartitionedTopic("testProduce", true);
// Using the ZK container as it is separate from brokers, so its environment resembles real world usage more
ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
ContainerExecResult produceResult = produceWithPerfTool(clientToolContainer, serviceUrl, topicName);
checkOutputForLogs(produceResult,"PerformanceProducer - Aggregated throughput stats",
"PerformanceProducer - Aggregated latency stats");
}

@Test
private void testConsume() throws Exception {
String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
final String topicName = getNonPartitionedTopic("testConsume", true);
// Using the ZK container as it is separate from brokers, so its environment resembles real world usage more
ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
ContainerExecResult consumeResult = consumeWithPerfTool(clientToolContainer, serviceUrl, topicName);
checkOutputForLogs(consumeResult,"PerformanceConsumer - Aggregated throughput stats",
"PerformanceConsumer - Aggregated latency stats");
}

private ContainerExecResult produceWithPerfTool(ChaosContainer<?> container, String url, String topic) throws Exception {
ContainerExecResult result = container.execCmd("bin/pulsar-perf", "produce", "-u", url, "-m", String.valueOf(MESSAGE_COUNT), topic);

return failOnError("Performance producer", result);
}

private ContainerExecResult consumeWithPerfTool(ChaosContainer<?> container, String url, String topic) throws Exception {
CompletableFuture<ContainerExecResult> resultFuture =
container.execCmdAsync("bin/pulsar-perf", "consume", "-u", url, "-m", String.valueOf(MESSAGE_COUNT), topic);
produceWithPerfTool(container, url, topic);

ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
return failOnError("Performance consumer", result);
}

private static ContainerExecResult failOnError(String processDesc, ContainerExecResult result) {
if (result.getExitCode() != 0) {
fail(processDesc + " failed. Command output:\n" + result.getStdout()
+ "\nError output:\n" + result.getStderr());
}
return result;
}

private static void checkOutputForLogs(ContainerExecResult result, String... logs) {
String output = result.getStdout();
for (String log : logs) {
Assert.assertTrue(output.contains(log),
"command output did not contain log message '" + log + "'.\nFull stdout is:\n" + output);
}
}

}
1 change: 1 addition & 0 deletions tests/integration/src/test/resources/pulsar-cli.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<class name="org.apache.pulsar.tests.integration.cli.FunctionsCLITest"/>
<class name="org.apache.pulsar.tests.integration.cli.PackagesCliTest"/>
<class name="org.apache.pulsar.tests.integration.cli.PulsarVersionTest"/>
<class name="org.apache.pulsar.tests.integration.cli.PerfToolTest"/>
</classes>
</test>
</suite>

0 comments on commit 0c1aab8

Please sign in to comment.