Skip to content

Commit

Permalink
add redelivery rates to consumer and subscription stats
Browse files Browse the repository at this point in the history
  • Loading branch information
sschepens committed Oct 31, 2016
1 parent 340dc8d commit 96226ec
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class Consumer {
private final long consumerId;
private final String consumerName;
private final Rate msgOut;
private final Rate msgRedeliver;

// Represents how many messages we can safely send to the consumer without
// overflowing its receiving queue. The consumer will use Flow commands to
Expand Down Expand Up @@ -90,6 +91,7 @@ public Consumer(Subscription subscription, SubType subType, long consumerId, Str
this.maxUnackedMessages = maxUnackedMessages;
this.cnx = cnx;
this.msgOut = new Rate();
this.msgRedeliver = new Rate();
this.appId = appId;

stats = new ConsumerStats();
Expand Down Expand Up @@ -357,8 +359,10 @@ private boolean shouldBlockConsumerOnUnackMsgs() {

public void updateRates() {
msgOut.calculateRate();
msgRedeliver.calculateRate();
stats.msgRateOut = msgOut.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgRateRedeliver = msgRedeliver.getRate();
}

public ConsumerStats getStats() {
Expand Down Expand Up @@ -452,6 +456,11 @@ public void redeliverUnacknowledgedMessages() {
subscription.redeliverUnacknowledgedMessages(this);
flowConsumerBlockedPermits(this);
if (pendingAcks != null) {
int totalRedeliveryMessages = 0;
for (Integer batchSize : pendingAcks.values()) {
totalRedeliveryMessages += batchSize;
}
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
pendingAcks.clear();
}

Expand All @@ -474,6 +483,7 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
blockedConsumerOnUnackedMsgs = false;

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);

int numberOfBlockedPermits = Math.min(totalRedeliveryMessages,
permitsReceivedWhileConsumerBlocked.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ public PersistentSubscriptionStats getStats() {
subStats.consumers.add(consumerStats);
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
subStats.unackedMessages += consumerStats.unackedMessages;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
subscriptions.forEach((subscriptionName, subscription) -> {
double subMsgRateOut = 0;
double subMsgThroughputOut = 0;
double subMsgRateRedeliver = 0;
long subUnackedMessages = 0;

// Start subscription name & consumers
try {
Expand All @@ -834,6 +836,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
ConsumerStats consumerStats = consumer.getStats();
subMsgRateOut += consumerStats.msgRateOut;
subMsgThroughputOut += consumerStats.msgThroughputOut;
subMsgRateRedeliver += consumerStats.msgRateRedeliver;
subUnackedMessages += consumerStats.unackedMessages;

// Populate consumer specific stats here
destStatsStream.startObject();
Expand All @@ -844,6 +848,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
destStatsStream.writePair("connectedSince", consumerStats.connectedSince);
destStatsStream.writePair("msgRateOut", consumerStats.msgRateOut);
destStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut);
destStatsStream.writePair("msgRateRedeliver", consumerStats.msgRateRedeliver);
destStatsStream.endObject();
}

Expand All @@ -855,6 +860,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
destStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
destStatsStream.writePair("msgRateOut", subMsgRateOut);
destStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
destStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
destStatsStream.writePair("unackedMessages", subUnackedMessages);
destStatsStream.writePair("type", subscription.getTypeString());

// Close consumers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,96 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
assertEquals(subStats.msgBacklog, 0);
}

@Test
public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/successSharedTopic";
final String subName = "successSharedSub";

PersistentTopicStats stats;
PersistentSubscriptionStats subStats;

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Shared);
Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
assertNotNull(topicRef);

rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();

// subscription stats
assertEquals(stats.subscriptions.keySet().size(), 1);
assertEquals(subStats.msgBacklog, 0);
assertEquals(subStats.consumers.size(), 1);

Producer producer = pulsarClient.createProducer(topicName);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();

// publisher stats
assertEquals(subStats.msgBacklog, 10);
assertEquals(stats.publishers.size(), 1);
assertTrue(stats.publishers.get(0).msgRateIn > 0.0);
assertTrue(stats.publishers.get(0).msgThroughputIn > 0.0);
assertTrue(stats.publishers.get(0).averageMsgSize > 0.0);

// aggregated publish stats
assertEquals(stats.msgRateIn, stats.publishers.get(0).msgRateIn);
assertEquals(stats.msgThroughputIn, stats.publishers.get(0).msgThroughputIn);
double diff = stats.averageMsgSize - stats.publishers.get(0).averageMsgSize;
assertTrue(Math.abs(diff) < 0.000001);

// consumer stats
assertTrue(subStats.consumers.get(0).msgRateOut > 0.0);
assertTrue(subStats.consumers.get(0).msgThroughputOut > 0.0);
assertEquals(subStats.msgRateRedeliver, 0.0);
assertEquals(subStats.consumers.get(0).unackedMessages, 10);

// aggregated consumer stats
assertEquals(subStats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(subStats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
assertEquals(subStats.unackedMessages, subStats.consumers.get(0).unackedMessages);

consumer.redeliverUnacknowledgedMessages();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();
assertTrue(subStats.msgRateRedeliver > 0.0);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);

Message msg;
for (int i = 0; i < 10; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
}
consumer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

rolloverPerIntervalStats();
stats = topicRef.getStats();
subStats = stats.subscriptions.values().iterator().next();

assertEquals(subStats.msgBacklog, 0);
}

@Test
public void testBrokerStatsMetrics() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/newTopic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public class ConsumerStats {
/** Total throughput delivered to the consumer. bytes/s */
public double msgThroughputOut;

/** Total rate of messages redelivered by this consumer. msg/s */
public double msgRateRedeliver;

/** Name of the consumer */
public String consumerName;

Expand All @@ -45,6 +48,7 @@ public ConsumerStats add(ConsumerStats stats) {
checkNotNull(stats);
this.msgRateOut += stats.msgRateOut;
this.msgThroughputOut += stats.msgThroughputOut;
this.msgRateRedeliver += stats.msgRateRedeliver;
this.availablePermits += stats.availablePermits;
this.unackedMessages += stats.unackedMessages;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@ public class PersistentSubscriptionStats {
/** Total throughput delivered on this subscription. bytes/s */
public double msgThroughputOut;

/** Total rate of messages redelivered on this subscription. msg/s */
public double msgRateRedeliver;

/** Number of messages in the subscription backlog */
public long msgBacklog;

/** Number of unacknowledged messages for the subscription */
public long unackedMessages;

/** whether this subscription is Exclusive or Shared or Failover */
public SubType type;

Expand All @@ -50,7 +56,9 @@ public PersistentSubscriptionStats() {
public void reset() {
msgRateOut = 0;
msgThroughputOut = 0;
msgRateRedeliver = 0;
msgBacklog = 0;
unackedMessages = 0;
msgRateExpired = 0;
consumers.clear();
}
Expand All @@ -61,7 +69,9 @@ public PersistentSubscriptionStats add(PersistentSubscriptionStats stats) {
checkNotNull(stats);
this.msgRateOut += stats.msgRateOut;
this.msgThroughputOut += stats.msgThroughputOut;
this.msgRateRedeliver += stats.msgRateRedeliver;
this.msgBacklog += stats.msgBacklog;
this.unackedMessages += stats.unackedMessages;
this.msgRateExpired += stats.msgRateExpired;
if (this.consumers.size() != stats.consumers.size()) {
for (int i = 0; i < stats.consumers.size(); i++) {
Expand Down

0 comments on commit 96226ec

Please sign in to comment.