Skip to content

Commit

Permalink
[monitoring][broker][fix] Fix EntryFilter stats (#17605)
Browse files Browse the repository at this point in the history
* fix entryFilter stats

* fix test

* add test comment

* review fix
  • Loading branch information
tjiuming authored Sep 21, 2022
1 parent 260f5c6 commit 8441f67
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
List<Position> entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
List<PositionImpl> entriesToRedeliver = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters);
List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
List<PositionImpl> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
final Entry entry = entries.get(i);
if (entry == null) {
Expand All @@ -123,18 +124,24 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
);

int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
this.filterProcessedMsgs.add(entryMsgCnt);
if (hasFilter) {
this.filterProcessedMsgs.add(entryMsgCnt);
}

EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRejectedMsgs.add(entryMsgCnt);
entry.release();
continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add((PositionImpl) entry.getPosition());
entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRescheduledMsgs.add(entryMsgCnt);
entry.release();
continue;
Expand Down Expand Up @@ -176,7 +183,9 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
continue;
}

this.filterAcceptedMsgs.add(entryMsgCnt);
if (hasFilter) {
this.filterAcceptedMsgs.add(entryMsgCnt);
}

totalEntries++;
int batchSize = msgMetadata.getNumMessagesInBatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,21 @@ public void testNonContiguousDeletedMessagesRanges() throws Exception {
@DataProvider(name = "testSubscriptionMetrics")
public Object[][] topicAndSubscription() {
return new Object[][]{
{"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true},
{"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true},
{"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false},
{"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false},
{"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, true},
{"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, true},
{"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, true},
{"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, true},

{"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, false},
{"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, false},
{"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, false},
{"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, false},
};
}

@Test(dataProvider = "testSubscriptionMetrics")
public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats)
throws Exception {
public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats,
boolean setFilter) throws Exception {
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
Expand All @@ -175,12 +180,15 @@ public void testSubscriptionStats(final String topic, final String subName, bool
Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get()
.get().getSubscription(subName).getDispatcher();

Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
field.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
field.set(dispatcher, ImmutableList.of(loader1));
if (setFilter) {
Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
field.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
EntryFilterWithClassLoader loader1 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
field.set(dispatcher, ImmutableList.of(loader1));
}

for (int i = 0; i < 100; i++) {
producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
Expand Down Expand Up @@ -233,10 +241,18 @@ public void testSubscriptionStats(final String topic, final String subName, bool
.filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
.mapToDouble(m-> m.value).sum();

Assert.assertEquals(filterAccepted, 100);
if (isPersistent) {
Assert.assertEquals(filterRejected, 100);
Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
if (setFilter) {
Assert.assertEquals(filterAccepted, 100);
if (isPersistent) {
Assert.assertEquals(filterRejected, 100);
// Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
}
} else {
Assert.assertEquals(throughFilter, 0D);
Assert.assertEquals(filterAccepted, 0D);
Assert.assertEquals(filterRejected, 0D);
Assert.assertEquals(filterRescheduled, 0D);
}
} else {
Assert.assertEquals(throughFilterMetrics.size(), 0);
Expand All @@ -245,22 +261,32 @@ public void testSubscriptionStats(final String topic, final String subName, bool
Assert.assertEquals(rescheduledMetrics.size(), 0);
}

testSubscriptionStatsAdminApi(topic, subName);
testSubscriptionStatsAdminApi(topic, subName, setFilter);
}

private void testSubscriptionStatsAdminApi(String topic, String subName) throws Exception {
private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter) throws Exception {
boolean persistent = TopicName.get(topic).isPersistent();
TopicStats topicStats = admin.topics().getStats(topic);
SubscriptionStats stats = topicStats.getSubscriptions().get(subName);
Assert.assertNotNull(stats);

Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
if (persistent) {
Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
Assert.assertEquals(stats.getFilterProcessedMsgCount(),
stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()
+ stats.getFilterRescheduledMsgCount(),
0.01 * stats.getFilterProcessedMsgCount());
if (setFilter) {
Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
if (persistent) {
Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
// Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
Assert.assertEquals(stats.getFilterProcessedMsgCount(),
stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()
+ stats.getFilterRescheduledMsgCount(),
0.01 * stats.getFilterProcessedMsgCount());
}
} else {
Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 0L);
if (persistent) {
Assert.assertEquals(stats.getFilterRejectedMsgCount(), 0L);
Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 0L);
Assert.assertEquals(stats.getFilterRescheduledMsgCount(), 0L);
}
}
}
}

0 comments on commit 8441f67

Please sign in to comment.