Skip to content

Commit

Permalink
Merge branch 'apache:master' into improve/customConsumerExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
AuroraTwinkle authored Jul 23, 2024
2 parents d07b935 + fca9c5c commit 3a8af2f
Show file tree
Hide file tree
Showing 47 changed files with 2,047 additions and 114 deletions.
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ skipBrokerShutdownOnOOM=false
# Factory class-name to create topic with custom workflow
topicFactoryClassName=

# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache
# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName.
topicNameCacheMaxCapacity=100000

# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when
# there are too many topics are in use.
maxSecondsToClearTopicNameCache=7200

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

Expand Down
4 changes: 2 additions & 2 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ The Apache Software License, Version 2.0
* Prometheus
- io.prometheus-simpleclient_httpserver-0.16.0.jar
* Oxia
- io.streamnative.oxia-oxia-client-api-0.3.0.jar
- io.streamnative.oxia-oxia-client-0.3.0.jar
- io.streamnative.oxia-oxia-client-api-0.3.1.jar
- io.streamnative.oxia-oxia-client-0.3.1.jar
* OpenHFT
- net.openhft-zero-allocation-hashing-0.16.jar
* Java JSON WebTokens
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3472,6 +3472,19 @@ public LongPairRangeSet<Position> getIndividuallyDeletedMessagesSet() {
return individualDeletedMessages;
}

public Position processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(
LongPairRangeSet.RangeProcessor<Position> processor) {
final Position mdp;
lock.readLock().lock();
try {
mdp = markDeletePosition;
individualDeletedMessages.forEach(processor);
} finally {
lock.readLock().unlock();
}
return mdp;
}

public boolean isMessageDeleted(Position position) {
lock.readLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3497,7 +3497,7 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
* the position range
* @return the count of entries
*/
long getNumberOfEntries(Range<Position> range) {
public long getNumberOfEntries(Range<Position> range) {
Position fromPosition = range.lowerEndpoint();
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
Position toPosition = range.upperEndpoint();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ flexible messaging model and an intuitive client API.</description>
<apache-http-client.version>4.5.13</apache-http-client.version>
<apache-httpcomponents.version>4.4.15</apache-httpcomponents.version>
<jetcd.version>0.7.7</jetcd.version>
<oxia.version>0.3.0</oxia.version>
<oxia.version>0.3.1</oxia.version>
<snakeyaml.version>2.0</snakeyaml.version>
<ant.version>1.10.12</ant.version>
<seancfoley.ipaddress.version>5.5.0</seancfoley.ipaddress.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,21 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private boolean backlogQuotaCheckEnabled = true;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache"
+ " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName."
)
private int topicNameCacheMaxCapacity = 100_000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache"
+ " frequently when there are too many topics are in use."
)
private int maxSecondsToClearTopicNameCache = 3600 * 2;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Whether to enable precise time based backlog quota check. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.pulsar.broker.web;

import com.google.common.util.concurrent.RateLimiter;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.prometheus.client.Counter;
import java.io.IOException;
import javax.servlet.Filter;
Expand All @@ -33,15 +37,32 @@ public class RateLimitingFilter implements Filter {

private final RateLimiter limiter;

public RateLimitingFilter(double rateLimit) {
limiter = RateLimiter.create(rateLimit);
public static final String RATE_LIMIT_REQUEST_COUNT_METRIC_NAME =
"pulsar.web.filter.rate_limit.request.count";
private final LongCounter rateLimitRequestCounter;

public static final AttributeKey<String> RATE_LIMIT_RESULT =
AttributeKey.stringKey("pulsar.web.filter.rate_limit.result");
public enum Result {
ACCEPTED,
REJECTED;
public final Attributes attributes = Attributes.of(RATE_LIMIT_RESULT, name().toLowerCase());
}

@Deprecated
private static final Counter httpRejectedRequests = Counter.build()
.name("pulsar_broker_http_rejected_requests")
.help("Counter of HTTP requests rejected by rate limiting")
.register();

public RateLimitingFilter(double rateLimit, Meter meter) {
limiter = RateLimiter.create(rateLimit);
rateLimitRequestCounter = meter.counterBuilder(RATE_LIMIT_REQUEST_COUNT_METRIC_NAME)
.setDescription("Counter of HTTP requests processed by the rate limiting filter.")
.setUnit("{request}")
.build();
}

@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
Expand All @@ -50,9 +71,11 @@ public void init(FilterConfig filterConfig) throws ServletException {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
if (limiter.tryAcquire()) {
rateLimitRequestCounter.add(1, Result.ACCEPTED.attributes);
chain.doFilter(request, response);
} else {
httpRejectedRequests.inc();
rateLimitRequestCounter.add(1, Result.REJECTED.attributes);
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.sendError(429, "Too Many Requests");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,8 @@ private synchronized void startLoadBalancerTasks() {
if (isRunning()) {
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor, config);
loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor,
config, getManagedLedgerFactory());
loadSheddingTask.start();
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,19 +42,27 @@ public class LoadSheddingTask implements Runnable {

private volatile ScheduledFuture<?> future;

private final ManagedLedgerFactory factory;

public LoadSheddingTask(AtomicReference<LoadManager> loadManager,
ScheduledExecutorService loadManagerExecutor,
ServiceConfiguration config) {
ServiceConfiguration config,
ManagedLedgerFactory factory) {
this.loadManager = loadManager;
this.loadManagerExecutor = loadManagerExecutor;
this.config = config;
this.factory = factory;
}

@Override
public void run() {
if (isCancel) {
return;
}
if (factory instanceof ManagedLedgerFactoryImpl
&& !((ManagedLedgerFactoryImpl) factory).isMetadataServiceAvailable()) {
return;
}
try {
loadManager.get().doLoadShedding();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -55,6 +56,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -104,6 +106,7 @@
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.stats.MetricsUtil;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand Down Expand Up @@ -187,6 +190,9 @@ public class NamespaceService implements AutoCloseable {
.register();
private final DoubleHistogram lookupLatencyHistogram;

private ConcurrentHashMap<String, CompletableFuture<List<String>>> inProgressQueryUserTopics =
new ConcurrentHashMap<>();

/**
* Default constructor.
*/
Expand Down Expand Up @@ -1509,6 +1515,23 @@ public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceNa
}
}

public CompletableFuture<List<String>> getListOfUserTopics(NamespaceName namespaceName, Mode mode) {
String key = String.format("%s://%s", mode, namespaceName);
final MutableBoolean initializedByCurrentThread = new MutableBoolean();
CompletableFuture<List<String>> queryRes = inProgressQueryUserTopics.computeIfAbsent(key, k -> {
initializedByCurrentThread.setTrue();
return getListOfTopics(namespaceName, mode).thenApplyAsync(list -> {
return TopicList.filterSystemTopic(list);
}, pulsar.getExecutor());
});
if (initializedByCurrentThread.getValue()) {
queryRes.whenComplete((ignore, ex) -> {
inProgressQueryUserTopics.remove(key, queryRes);
});
}
return queryRes;
}

public CompletableFuture<List<String>> getAllPartitions(NamespaceName namespaceName) {
return getPartitions(namespaceName, TopicDomain.persistent)
.thenCombine(getPartitions(namespaceName, TopicDomain.non_persistent),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,16 @@ public void start() throws Exception {
this.updateBrokerDispatchThrottlingMaxRate();
this.startCheckReplicationPolicies();
this.startDeduplicationSnapshotMonitor();
this.startClearInvalidateTopicNameCacheTask();
}

protected void startClearInvalidateTopicNameCacheTask() {
final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
inactivityMonitor.scheduleAtFixedRate(
() -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
maxSecondsToClearTopicNameCache,
maxSecondsToClearTopicNameCache,
TimeUnit.SECONDS);
}

protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public class Consumer {

private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
private Position readPositionWhenJoining;
private Position lastSentPositionWhenJoining;
private final String clientAddress; // IP address only, no port number included
private final MessageId startMessageId;
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
Expand Down Expand Up @@ -931,8 +931,8 @@ public ConsumerStatsImpl getStats() {
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
if (readPositionWhenJoining != null) {
stats.readPositionWhenJoining = readPositionWhenJoining.toString();
if (lastSentPositionWhenJoining != null) {
stats.lastSentPositionWhenJoining = lastSentPositionWhenJoining.toString();
}
return stats;
}
Expand Down Expand Up @@ -1166,8 +1166,8 @@ public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}

public void setReadPositionWhenJoining(Position readPositionWhenJoining) {
this.readPositionWhenJoining = readPositionWhenJoining;
public void setLastSentPositionWhenJoining(Position lastSentPositionWhenJoining) {
this.lastSentPositionWhenJoining = lastSentPositionWhenJoining;
}

public int getMaxUnackedMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
Expand Down Expand Up @@ -663,7 +664,9 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
topicName, ex.getMessage(), ex);
ServerError error = ServerError.ServiceNotReady;
if (ex instanceof RestException restException){
if (ex instanceof MetadataStoreException) {
error = ServerError.MetadataError;
} else if (ex instanceof RestException restException){
int responseCode = restException.getResponse().getStatus();
if (responseCode == NOT_FOUND.getStatusCode()){
error = ServerError.TopicNotFound;
Expand Down Expand Up @@ -2459,11 +2462,11 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
if (lookupSemaphore.tryAcquire()) {
isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
if (isAuthorized) {
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode)
.thenAccept(topics -> {
boolean filterTopics = false;
// filter system topic
List<String> filteredTopics = TopicList.filterSystemTopic(topics);
List<String> filteredTopics = topics;

if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) {
if (topicsPattern.get().length() <= maxSubscriptionPatternLength) {
Expand Down
Loading

0 comments on commit 3a8af2f

Please sign in to comment.