From d5e1ebc628259bdf3c1867f038cb8aed061164ab Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 18 Apr 2019 10:01:04 -0700 Subject: [PATCH] Allow to configure the delayed tracker implementation --- .../pulsar/broker/ServiceConfiguration.java | 3 + .../apache/pulsar/broker/PulsarService.java | 25 ++----- .../delayed/DelayedDeliveryTracker.java | 67 +++++++++++++++++++ .../DelayedDeliveryTrackerFactory.java | 55 +++++++++++++++ .../delayed/DelayedDeliveryTrackerLoader.java | 49 ++++++++++++++ .../InMemoryDelayedDeliveryTracker.java} | 18 +++-- ...InMemoryDelayedDeliveryTrackerFactory.java | 55 +++++++++++++++ .../broker/loadbalance/LoadManager.java | 10 ++- .../pulsar/broker/service/BrokerService.java | 13 ++++ .../pulsar/broker/service/Consumer.java | 18 ----- ...PersistentDispatcherMultipleConsumers.java | 7 +- ...sistentDispatcherSingleActiveConsumer.java | 6 +- ...PersistentDispatcherMultipleConsumers.java | 5 +- 13 files changed, 273 insertions(+), 58 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactory.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{service/persistent/DelayedDeliveryTracker.java => delayed/InMemoryDelayedDeliveryTracker.java} (91%) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 94d55c7d4ca5b..532d912eb9c44 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -157,6 +157,9 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.") private boolean delayedDeliveryEnabled = true; + @FieldContext(category = CATEGORY_SERVER, doc = "Class name of the factory that implements the delayed deliver tracker") + private String delayedDeliveryTrackerFactoryClassName = "org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory"; + @FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for when retrying on delayed delivery, " + " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.") private long delayedDeliveryTickTimeMillis = 1000; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 0de81db30c44b..934ce109f0939 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -28,13 +28,14 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.net.URI; -import java.util.*; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -97,9 +98,9 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.TwoPhaseCompactor; -import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.websocket.WebSocketConsumerServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; @@ -168,8 +169,6 @@ public class PulsarService implements AutoCloseable { private SchemaRegistryService schemaRegistryService = null; private final Optional functionWorkerService; - private Optional delayedDeliveryTimer = Optional.empty(); - private final MessagingServiceShutdownHook shutdownService; private MetricsGenerator metricsGenerator; @@ -218,10 +217,6 @@ public void close() throws PulsarServerException { return; } - if (delayedDeliveryTimer.isPresent()) { - delayedDeliveryTimer.get().stop(); - } - // close the service in reverse order v.s. in which they are started if (this.webService != null) { this.webService.close(); @@ -950,16 +945,6 @@ public SchemaRegistryService getSchemaRegistryService() { return schemaRegistryService; } - public synchronized Timer getDelayedDeliveryTimer() { - // Lazy initialize - if (!delayedDeliveryTimer.isPresent()) { - delayedDeliveryTimer = Optional.of(new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"), - config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS)); - } - - return delayedDeliveryTimer.get(); - } - private void startWorkerService(AuthenticationService authenticationService, AuthorizationService authorizationService) throws InterruptedException, IOException, KeeperException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java new file mode 100644 index 0000000000000..836a2ff57d528 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import com.google.common.annotations.Beta; + +import java.util.Set; + +import org.apache.bookkeeper.mledger.impl.PositionImpl; + +/** + * Represent the tracker for the delayed delivery of messages for a particular subscription. + * + * Note: this interface is still being refined and some breaking changes might be introduced. + */ +@Beta +public interface DelayedDeliveryTracker extends AutoCloseable { + + /** + * Add a message to the tracker + * + * @param ledgerId + * the ledgerId + * @param entryId + * the entryId + * @param deliveryAt + * the absolute timestamp at which the message should be tracked + * @return true if the message was added to the tracker or false if it should be delivered immediately + */ + boolean addMessage(long ledgerId, long entryId, long deliveryAt); + + /** + * Return true if there's at least a message that is scheduled to be delivered already + */ + boolean hasMessageAvailable(); + + /** + * @return the number of delayed messages being tracked + */ + long getNumberOfDelayedMessages(); + + /** + * Get a set of position of messages that have already reached the delivery time + */ + Set getScheduledMessages(int maxMessages); + + /** + * Close the subscription tracker and release all resources. + */ + void close(); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactory.java new file mode 100644 index 0000000000000..b0a16a571e10f --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import com.google.common.annotations.Beta; + +import java.io.IOException; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; + +/** + * Factory of InMemoryDelayedDeliveryTracker objects. This is the entry point for implementations. + * + * Note: this interface is still being refined and some breaking changes might be introduced. + */ +@Beta +public interface DelayedDeliveryTrackerFactory extends AutoCloseable { + /** + * Initialize the factory implementation from the broker service configuration + * + * @param config + * the broker service config object + */ + void initialize(ServiceConfiguration config) throws IOException; + + /** + * Create a new tracker instance. + * + * @param dispatcher + * a multi-consumer dispatcher instance + */ + DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher); + + /** + * Close the factory and release all the resources + */ + void close() throws IOException; +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java new file mode 100644 index 0000000000000..db4cfe6426329 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; + +import lombok.experimental.UtilityClass; + +import org.apache.pulsar.broker.ServiceConfiguration; + +@UtilityClass +public class DelayedDeliveryTrackerLoader { + public static DelayedDeliveryTrackerFactory loadDelayedDeliveryTrackerFactory(ServiceConfiguration conf) + throws IOException { + // try { + Class factoryClass; + try { + factoryClass = Class.forName(conf.getDelayedDeliveryTrackerFactoryClassName()); + Object obj = factoryClass.newInstance(); + checkArgument(obj instanceof DelayedDeliveryTrackerFactory, + "The factory has to be an instance of " + DelayedDeliveryTrackerFactory.class.getName()); + + DelayedDeliveryTrackerFactory factory = (DelayedDeliveryTrackerFactory) obj; + factory.initialize(conf); + return factory; + + } catch (Exception e) { + throw new IOException(e); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java similarity index 91% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTracker.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 3f2709e53b78e..2dd6ff794e041 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.service.persistent; +package org.apache.pulsar.broker.delayed; import io.netty.util.Timeout; import io.netty.util.Timer; @@ -29,10 +29,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; @Slf4j -public class DelayedDeliveryTracker implements AutoCloseable, TimerTask { +public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask { private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue(); @@ -49,13 +50,13 @@ public class DelayedDeliveryTracker implements AutoCloseable, TimerTask { private final long tickTimeMillis; - public DelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher) { + InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis) { this.dispatcher = dispatcher; - this.timer = dispatcher.getTopic().getBrokerService().pulsar().getDelayedDeliveryTimer(); - this.tickTimeMillis = dispatcher.getTopic().getBrokerService().pulsar().getConfiguration() - .getDelayedDeliveryTickTimeMillis(); + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; } + @Override public boolean addMessage(long ledgerId, long entryId, long deliveryAt) { long now = System.currentTimeMillis(); if (log.isDebugEnabled()) { @@ -75,6 +76,7 @@ public boolean addMessage(long ledgerId, long entryId, long deliveryAt) { /** * Return true if there's at least a message that is scheduled to be delivered already */ + @Override public boolean hasMessageAvailable() { return !priorityQueue.isEmpty() && priorityQueue.peekN1() <= System.currentTimeMillis(); } @@ -82,6 +84,7 @@ public boolean hasMessageAvailable() { /** * Get a set of position of messages that have already reached */ + @Override public Set getScheduledMessages(int maxMessages) { int n = maxMessages; Set positions = new TreeSet<>(); @@ -111,7 +114,8 @@ public Set getScheduledMessages(int maxMessages) { return positions; } - public long size() { + @Override + public long getNumberOfDelayedMessages() { return priorityQueue.size(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java new file mode 100644 index 0000000000000..71f0fc293d2e1 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; +import io.netty.util.concurrent.DefaultThreadFactory; + +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; + +public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory { + + private Timer timer; + + private long tickTimeMillis; + + @Override + public void initialize(ServiceConfiguration config) { + this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"), + config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS); + this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis(); + } + + @Override + public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) { + return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis); + } + + @Override + public void close() { + if (timer != null) { + timer.stop(); + } + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index 6a9dc38f33256..b6493e13a49b3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -108,12 +108,12 @@ public interface LoadManager { * @throws Exception */ public void disableBroker() throws Exception; - + /** * Get list of available brokers in cluster - * + * * @return - * @throws Exception + * @throws Exception */ Set getAvailableBrokers() throws Exception; @@ -134,9 +134,7 @@ static LoadManager create(final PulsarService pulsar) { // Assume there is a constructor with one argument of PulsarService. final Object loadManagerInstance = loadManagerClass.newInstance(); if (loadManagerInstance instanceof LoadManager) { - final LoadManager casted = (LoadManager) loadManagerInstance; - casted.initialize(pulsar); - return casted; + return (LoadManager) loadManagerInstance; } else if (loadManagerInstance instanceof ModularLoadManager) { final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance); casted.initialize(pulsar); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index fddda96351f72..0f6f5bd66c6e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -80,6 +80,8 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; @@ -189,6 +191,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener blockedDispatchers; private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory; + public BrokerService(PulsarService pulsar) throws Exception { this.pulsar = pulsar; this.managedLedgerFactory = pulsar.getManagedLedgerFactory(); @@ -271,9 +275,13 @@ public void recordLatency(EventType eventType, long latencyMs) { pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs); } }; + + this.delayedDeliveryTrackerFactory = DelayedDeliveryTrackerLoader + .loadDelayedDeliveryTrackerFactory(pulsar.getConfiguration()); } public void start() throws Exception { + this.delayedDeliveryTrackerFactory.initialize(pulsar.getConfiguration()); this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath, pulsar.getConfiguration().getClusterName()); @@ -413,6 +421,7 @@ public void close() throws IOException { ClientCnxnAspect.removeListener(zkStatsListener); ClientCnxnAspect.registerExecutor(null); topicOrderedExecutor.shutdown(); + delayedDeliveryTrackerFactory.close(); log.info("Broker service completely shut down"); } @@ -1375,6 +1384,10 @@ public void onUpdate(String path, Map data, Stat stat) { } + public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() { + return delayedDeliveryTrackerFactory; + } + public static List getDynamicConfiguration() { return dynamicConfigurationMap.keys(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 9fca8a8674c18..a62b2c5b416b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -283,24 +283,6 @@ private void incrementUnackedMessages(int ackedMessages) { } } - public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, Subscription subscription, long consumerId) { - try { - // save the reader index and restore after parsing - metadataAndPayload.markReaderIndex(); - PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); - metadataAndPayload.resetReaderIndex(); - int batchSize = metadata.getNumMessagesInBatch(); - metadata.recycle(); - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] num messages in batch are {} ", subscription, consumerId, batchSize); - } - return batchSize; - } catch (Throwable t) { - log.error("[{}] [{}] Failed to parse message metadata", subscription, consumerId, t); - } - return -1; - } - public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, Subscription subscription, long consumerId) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index 2067a80d8efc9..bf8c256534655 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.service.nonpersistent; -import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry; - import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -35,6 +33,7 @@ import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.utils.CopyOnWriteArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -194,7 +193,9 @@ public void sendMessages(List entries) { TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.sendMessages(entries).getTotalSentMessages()); } else { entries.forEach(entry -> { - int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1); + MessageMetadata msgMetatada = Consumer.peekMessageMetadata(entry.getDataBuffer(), subscription, -1); + int totalMsgs = msgMetatada.getNumMessagesInBatch(); + msgMetatada.recycle(); if (totalMsgs > 0) { msgDrop.recordEvent(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java index 787fb00a940ed..4cdf91d8cadf3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.service.nonpersistent; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; -import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry; import java.util.List; @@ -32,6 +31,7 @@ import org.apache.pulsar.broker.service.RedeliveryTracker; import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; @@ -61,7 +61,9 @@ public void sendMessages(List entries) { currentConsumer.sendMessages(entries); } else { entries.forEach(entry -> { - int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1); + MessageMetadata msgMetatada = Consumer.peekMessageMetadata(entry.getDataBuffer(), subscription, -1); + int totalMsgs = msgMetatada.getNumMessagesInBatch(); + msgMetatada.recycle(); if (totalMsgs > 0) { msgDrop.recordEvent(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 31cb4c751dc5f..6001ef3c03b14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -42,6 +42,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; @@ -664,7 +665,7 @@ public synchronized boolean trackDelayedDelivery(long ledgerId, long entryId, Me if (!delayedDeliveryTracker.isPresent()) { // Initialize the tracker the first time we need to use it - delayedDeliveryTracker = Optional.of(new DelayedDeliveryTracker(this)); + delayedDeliveryTracker = Optional.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); } return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime()); @@ -698,7 +699,7 @@ private Set getMessagesToReplayNow(int maxMessagesToRead) { public long getNumberOfDelayedMessages() { if (delayedDeliveryTracker.isPresent()) { - return delayedDeliveryTracker.get().size(); + return delayedDeliveryTracker.get().getNumberOfDelayedMessages(); } else { return 0; }