diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java index 658b7c94165d93..0b16ef12d8996e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java @@ -72,34 +72,50 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) { rgConfig.setPublishRateInMsgs(2000); service.resourceGroupCreate(rgName, rgConfig); - org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName); BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount(); bytesAndMessagesCount.bytes = 20; bytesAndMessagesCount.messages = 10; - resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish, bytesAndMessagesCount); + + org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName); + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + resourceGroup.incrementLocalUsageStats(value, bytesAndMessagesCount); + } + + // Case1: Suppress report ResourceUsage. + needReport.set(false); ResourceUsage resourceUsage = new ResourceUsage(); resourceGroup.rgFillResourceUsage(resourceUsage); assertFalse(resourceUsage.hasDispatch()); assertFalse(resourceUsage.hasPublish()); + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + PerMonitoringClassFields publishMonitoredEntity = + resourceGroup.getMonitoredEntity(value); + assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0); + assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0); + assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0); + assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0); + assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0); + assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0); + } - PerMonitoringClassFields publishMonitoredEntity = - resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish); - assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, bytesAndMessagesCount.messages); - assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, bytesAndMessagesCount.bytes); - assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0); - assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0); - assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0); - assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0); - + // Case2: Report ResourceUsage. + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + resourceGroup.incrementLocalUsageStats(value, bytesAndMessagesCount); + } needReport.set(true); + resourceUsage = new ResourceUsage(); resourceGroup.rgFillResourceUsage(resourceUsage); assertTrue(resourceUsage.hasDispatch()); assertTrue(resourceUsage.hasPublish()); - assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0); - assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0); - assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages); - assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes); - assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages); - assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes); + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + PerMonitoringClassFields publishMonitoredEntity = + resourceGroup.getMonitoredEntity(value); + assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0); + assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0); + assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages); + assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes); + assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages); + assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes); + } } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index 1468a2d5db85d4..0e8bbc34c78da3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -258,67 +258,6 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep Assert.assertEquals(rgs.getNumResourceGroups(), 0); } - @Test - public void testResourceGroupResetUsedLocallySinceLastReport() throws PulsarAdminException { - org.apache.pulsar.common.policies.data.ResourceGroup rgConfig = - new org.apache.pulsar.common.policies.data.ResourceGroup(); - final String rgName = UUID.randomUUID().toString(); - rgConfig.setPublishRateInBytes(15000L); - rgConfig.setPublishRateInMsgs(100); - rgConfig.setDispatchRateInBytes(40000L); - rgConfig.setDispatchRateInMsgs(500); - - this.pulsar.getResourceGroupServiceManager().resourceGroupCreate(rgName, rgConfig); - - ResourceGroup retRG = this.pulsar.getResourceGroupServiceManager().resourceGroupGet(rgName); - - PerMonitoringClassFields monClassFields = null; - // Case1: Suppress report ResourceUsage. - for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { - monClassFields = retRG.monitoringClassFields[value.ordinal()]; - monClassFields.usedLocallySinceLastReport.bytes = monClassFields.lastReportedValues.bytes = 10; - monClassFields.usedLocallySinceLastReport.messages = monClassFields.lastReportedValues.messages = 10; - monClassFields.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis(); - } - - ResourceUsage resourceUsage = new ResourceUsage(); - retRG.rgFillResourceUsage(resourceUsage); - Assert.assertFalse(resourceUsage.hasDispatch()); - Assert.assertFalse(resourceUsage.hasPublish()); - - for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { - monClassFields = retRG.monitoringClassFields[value.ordinal()]; - Assert.assertEquals(monClassFields.usedLocallySinceLastReport.messages, 0L); - Assert.assertEquals(monClassFields.usedLocallySinceLastReport.bytes, 0L); - } - - // Case2: Report ResourceUsage. - for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { - monClassFields = retRG.monitoringClassFields[value.ordinal()]; - monClassFields.usedLocallySinceLastReport.bytes = monClassFields.lastReportedValues.bytes * 2; - monClassFields.usedLocallySinceLastReport.messages = monClassFields.lastReportedValues.messages * 2; - } - - resourceUsage = new ResourceUsage(); - retRG.rgFillResourceUsage(resourceUsage); - Assert.assertTrue(resourceUsage.hasDispatch()); - NetworkUsage dispatch = resourceUsage.getDispatch(); - Assert.assertNotNull(monClassFields); - Assert.assertEquals(dispatch.getBytesPerPeriod(), monClassFields.lastReportedValues.bytes); - Assert.assertEquals(dispatch.getMessagesPerPeriod(), monClassFields.lastReportedValues.messages); - - Assert.assertTrue(resourceUsage.hasPublish()); - NetworkUsage publish = resourceUsage.getPublish(); - Assert.assertEquals(publish.getBytesPerPeriod(), monClassFields.lastReportedValues.bytes); - Assert.assertEquals(publish.getMessagesPerPeriod(), monClassFields.lastReportedValues.messages); - - for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { - monClassFields = retRG.monitoringClassFields[value.ordinal()]; - Assert.assertEquals(monClassFields.usedLocallySinceLastReport.messages, 0L); - Assert.assertEquals(monClassFields.usedLocallySinceLastReport.bytes, 0L); - } - } - @Test public void testClose() throws Exception { ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, null, null);