Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed May 8, 2024
1 parent 7e747ff commit e21f48d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,34 +72,50 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
rgConfig.setPublishRateInMsgs(2000);
service.resourceGroupCreate(rgName, rgConfig);

org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName);
BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount();
bytesAndMessagesCount.bytes = 20;
bytesAndMessagesCount.messages = 10;
resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish, bytesAndMessagesCount);

org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName);
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
resourceGroup.incrementLocalUsageStats(value, bytesAndMessagesCount);
}

// Case1: Suppress report ResourceUsage.
needReport.set(false);
ResourceUsage resourceUsage = new ResourceUsage();
resourceGroup.rgFillResourceUsage(resourceUsage);
assertFalse(resourceUsage.hasDispatch());
assertFalse(resourceUsage.hasPublish());
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
PerMonitoringClassFields publishMonitoredEntity =
resourceGroup.getMonitoredEntity(value);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0);
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0);
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0);
assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0);
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0);
}

PerMonitoringClassFields publishMonitoredEntity =
resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, bytesAndMessagesCount.messages);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, bytesAndMessagesCount.bytes);
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0);
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0);
assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0);
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0);

// Case2: Report ResourceUsage.
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
resourceGroup.incrementLocalUsageStats(value, bytesAndMessagesCount);
}
needReport.set(true);
resourceUsage = new ResourceUsage();
resourceGroup.rgFillResourceUsage(resourceUsage);
assertTrue(resourceUsage.hasDispatch());
assertTrue(resourceUsage.hasPublish());
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0);
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages);
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes);
assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages);
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes);
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
PerMonitoringClassFields publishMonitoredEntity =
resourceGroup.getMonitoredEntity(value);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0);
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages);
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes);
assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages);
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,67 +258,6 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep
Assert.assertEquals(rgs.getNumResourceGroups(), 0);
}

@Test
public void testResourceGroupResetUsedLocallySinceLastReport() throws PulsarAdminException {
org.apache.pulsar.common.policies.data.ResourceGroup rgConfig =
new org.apache.pulsar.common.policies.data.ResourceGroup();
final String rgName = UUID.randomUUID().toString();
rgConfig.setPublishRateInBytes(15000L);
rgConfig.setPublishRateInMsgs(100);
rgConfig.setDispatchRateInBytes(40000L);
rgConfig.setDispatchRateInMsgs(500);

this.pulsar.getResourceGroupServiceManager().resourceGroupCreate(rgName, rgConfig);

ResourceGroup retRG = this.pulsar.getResourceGroupServiceManager().resourceGroupGet(rgName);

PerMonitoringClassFields monClassFields = null;
// Case1: Suppress report ResourceUsage.
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
monClassFields = retRG.monitoringClassFields[value.ordinal()];
monClassFields.usedLocallySinceLastReport.bytes = monClassFields.lastReportedValues.bytes = 10;
monClassFields.usedLocallySinceLastReport.messages = monClassFields.lastReportedValues.messages = 10;
monClassFields.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
}

ResourceUsage resourceUsage = new ResourceUsage();
retRG.rgFillResourceUsage(resourceUsage);
Assert.assertFalse(resourceUsage.hasDispatch());
Assert.assertFalse(resourceUsage.hasPublish());

for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
monClassFields = retRG.monitoringClassFields[value.ordinal()];
Assert.assertEquals(monClassFields.usedLocallySinceLastReport.messages, 0L);
Assert.assertEquals(monClassFields.usedLocallySinceLastReport.bytes, 0L);
}

// Case2: Report ResourceUsage.
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
monClassFields = retRG.monitoringClassFields[value.ordinal()];
monClassFields.usedLocallySinceLastReport.bytes = monClassFields.lastReportedValues.bytes * 2;
monClassFields.usedLocallySinceLastReport.messages = monClassFields.lastReportedValues.messages * 2;
}

resourceUsage = new ResourceUsage();
retRG.rgFillResourceUsage(resourceUsage);
Assert.assertTrue(resourceUsage.hasDispatch());
NetworkUsage dispatch = resourceUsage.getDispatch();
Assert.assertNotNull(monClassFields);
Assert.assertEquals(dispatch.getBytesPerPeriod(), monClassFields.lastReportedValues.bytes);
Assert.assertEquals(dispatch.getMessagesPerPeriod(), monClassFields.lastReportedValues.messages);

Assert.assertTrue(resourceUsage.hasPublish());
NetworkUsage publish = resourceUsage.getPublish();
Assert.assertEquals(publish.getBytesPerPeriod(), monClassFields.lastReportedValues.bytes);
Assert.assertEquals(publish.getMessagesPerPeriod(), monClassFields.lastReportedValues.messages);

for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
monClassFields = retRG.monitoringClassFields[value.ordinal()];
Assert.assertEquals(monClassFields.usedLocallySinceLastReport.messages, 0L);
Assert.assertEquals(monClassFields.usedLocallySinceLastReport.bytes, 0L);
}
}

@Test
public void testClose() throws Exception {
ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, null, null);
Expand Down

0 comments on commit e21f48d

Please sign in to comment.