Skip to content

Commit

Permalink
Fix NPE when ResourceGroupService execute scheduled task. (#17840)
Browse files Browse the repository at this point in the history
(cherry picked from commit 62d900f)
  • Loading branch information
Technoboy- authored and nicoloboschi committed Oct 25, 2022
1 parent 6c319f1 commit 42ccecf
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,14 @@ public CompletableFuture<Void> closeAsync() {
}
this.resourceUsageTransportManager = null;
}
if (this.resourceGroupServiceManager != null) {
try {
this.resourceGroupServiceManager.close();
} catch (Exception e) {
LOG.warn("ResourceGroupServiceManager closing failed {}", e.getMessage());
}
this.resourceGroupServiceManager = null;
}

if (this.webService != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.resourcegroup;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import java.util.Map;
Expand Down Expand Up @@ -52,7 +53,7 @@
*
* @see PulsarService
*/
public class ResourceGroupService {
public class ResourceGroupService implements AutoCloseable{
/**
* Default constructor.
*/
Expand Down Expand Up @@ -302,6 +303,21 @@ public ResourceGroup getNamespaceResourceGroup(NamespaceName namespaceName) {
return this.namespaceToRGsMap.get(namespaceName);
}

@Override
public void close() throws Exception {
if (aggregateLocalUsagePeriodicTask != null) {
aggregateLocalUsagePeriodicTask.cancel(true);
}
if (calculateQuotaPeriodicTask != null) {
calculateQuotaPeriodicTask.cancel(true);
}
resourceGroupsMap.clear();
tenantToRGsMap.clear();
namespaceToRGsMap.clear();
topicProduceStats.clear();
topicConsumeStats.clear();
}

/**
* Increments usage stats for the resource groups associated with the given namespace and tenant.
* Expected to be called when a message is produced or consumed on a topic, or when we calculate
Expand Down Expand Up @@ -564,17 +580,17 @@ protected void aggregateResourceGroupLocalUsages() {
ServiceConfiguration config = pulsar.getConfiguration();
long newPeriodInSeconds = config.getResourceUsageTransportPublishIntervalInSecs();
if (newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) {
if (this.aggreagteLocalUsagePeriodicTask == null) {
if (this.aggregateLocalUsagePeriodicTask == null) {
log.error("aggregateResourceGroupLocalUsages: Unable to find running task to cancel when "
+ "publish period changed from {} to {} {}",
this.aggregateLocalUsagePeriodInSeconds, newPeriodInSeconds, timeUnitScale);
} else {
boolean cancelStatus = this.aggreagteLocalUsagePeriodicTask.cancel(true);
boolean cancelStatus = this.aggregateLocalUsagePeriodicTask.cancel(true);
log.info("aggregateResourceGroupLocalUsages: Got status={} in cancel of periodic "
+ "when publish period changed from {} to {} {}",
cancelStatus, this.aggregateLocalUsagePeriodInSeconds, newPeriodInSeconds, timeUnitScale);
}
this.aggreagteLocalUsagePeriodicTask = pulsar.getExecutor().scheduleAtFixedRate(
this.aggregateLocalUsagePeriodicTask = pulsar.getExecutor().scheduleAtFixedRate(
catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
newPeriodInSeconds,
newPeriodInSeconds,
Expand Down Expand Up @@ -679,7 +695,7 @@ private void initialize() {
ServiceConfiguration config = this.pulsar.getConfiguration();
long periodInSecs = config.getResourceUsageTransportPublishIntervalInSecs();
this.aggregateLocalUsagePeriodInSeconds = this.resourceUsagePublishPeriodInSeconds = periodInSecs;
this.aggreagteLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(
this.aggregateLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(
catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
periodInSecs,
periodInSecs,
Expand Down Expand Up @@ -736,7 +752,7 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie


// The task that periodically re-calculates the quota budget for local usage.
private ScheduledFuture<?> aggreagteLocalUsagePeriodicTask;
private ScheduledFuture<?> aggregateLocalUsagePeriodicTask;
private long aggregateLocalUsagePeriodInSeconds;

// The task that periodically re-calculates the quota budget for local usage.
Expand Down Expand Up @@ -829,4 +845,24 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
.name("pulsar_resource_group_calculate_quota_secs")
.help("Time required to calculate quota of all resource groups, in seconds.")
.register();

@VisibleForTesting
ConcurrentHashMap getTopicConsumeStats() {
return this.topicConsumeStats;
}

@VisibleForTesting
ConcurrentHashMap getTopicProduceStats() {
return this.topicProduceStats;
}

@VisibleForTesting
ScheduledFuture<?> getAggregateLocalUsagePeriodicTask() {
return this.aggregateLocalUsagePeriodicTask;
}

@VisibleForTesting
ScheduledFuture<?> getCalculateQuotaPeriodicTask() {
return this.calculateQuotaPeriodicTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,14 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep
Assert.assertEquals(rgs.getNumResourceGroups(), 0);
}

@Test
public void testClose() throws Exception {
ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, null, null);
service.close();
Assert.assertTrue(service.getAggregateLocalUsagePeriodicTask().isCancelled());
Assert.assertTrue(service.getCalculateQuotaPeriodicTask().isCancelled());
}

private ResourceGroupService rgs;
int numAnonymousQuotaCalculations;

Expand Down

0 comments on commit 42ccecf

Please sign in to comment.