diff --git a/conf/broker.conf b/conf/broker.conf index ae95d48ae197b..02b425e8992f0 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -259,6 +259,16 @@ maxMessageSize=5242880 # Interval between checks to see if topics with compaction policies need to be compacted brokerServiceCompactionMonitorIntervalInSeconds=60 +# Whether to enable the delayed delivery for messages. +# If disabled, messages will be immediately delivered and there will +# be no tracking overhead. +delayedDeliveryEnabled=true + +# Control the tick time for when retrying on delayed delivery, +# affecting the accuracy of the delivery time compared to the scheduled time. +# Default is 1 second. +delayedDeliveryTickTimeMillis=1000 + # Enable tracking of replicated subscriptions state across clusters. enableReplicatedSubscriptions=true @@ -426,7 +436,7 @@ bookkeeperClientReorderReadSequenceEnabled=false # outside the specified groups will not be used by the broker bookkeeperClientIsolationGroups= -# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't +# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't # have enough bookie available. bookkeeperClientSecondaryIsolationGroups= diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 91ef0da7e1a24..9868dd5b02442 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -155,6 +155,16 @@ public class ServiceConfiguration implements PulsarConfiguration { // waiting for another HTTP call to complete in same thread. private int numHttpServerThreads = Math.max(8, 2 * Runtime.getRuntime().availableProcessors()); + @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.") + private boolean delayedDeliveryEnabled = true; + + @FieldContext(category = CATEGORY_SERVER, doc = "Class name of the factory that implements the delayed deliver tracker") + private String delayedDeliveryTrackerFactoryClassName = "org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory"; + + @FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for when retrying on delayed delivery, " + + " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.") + private long delayedDeliveryTickTimeMillis = 1000; + @FieldContext( category = CATEGORY_WEBSOCKET, doc = "Enable the WebSocket API service in broker" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 52c43c40da5cc..934ce109f0939 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -32,7 +32,10 @@ import java.io.IOException; import java.net.URI; -import java.util.*; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -95,9 +98,9 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.TwoPhaseCompactor; -import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.websocket.WebSocketConsumerServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; @@ -543,7 +546,7 @@ private void startZkCacheService() throws PulsarServerException { this.localZkCache = new LocalZooKeeperCache(getZkClient(), config.getZooKeeperOperationTimeoutSeconds(), getOrderedExecutor()); this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), - (int) config.getZooKeeperSessionTimeoutMillis(), + (int) config.getZooKeeperSessionTimeoutMillis(), config.getZooKeeperOperationTimeoutSeconds(), config.getConfigurationStoreServers(), getOrderedExecutor(), this.cacheExecutor); try { @@ -883,7 +886,7 @@ public static String brokerUrlTls(String host, int port) { } public static String webAddress(ServiceConfiguration config) { - if (config.getWebServicePort().isPresent()) { + if (config.getWebServicePort().isPresent()) { return webAddress(advertisedAddress(config), config.getWebServicePort().get()); } else { return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java new file mode 100644 index 0000000000000..836a2ff57d528 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import com.google.common.annotations.Beta; + +import java.util.Set; + +import org.apache.bookkeeper.mledger.impl.PositionImpl; + +/** + * Represent the tracker for the delayed delivery of messages for a particular subscription. + * + * Note: this interface is still being refined and some breaking changes might be introduced. + */ +@Beta +public interface DelayedDeliveryTracker extends AutoCloseable { + + /** + * Add a message to the tracker + * + * @param ledgerId + * the ledgerId + * @param entryId + * the entryId + * @param deliveryAt + * the absolute timestamp at which the message should be tracked + * @return true if the message was added to the tracker or false if it should be delivered immediately + */ + boolean addMessage(long ledgerId, long entryId, long deliveryAt); + + /** + * Return true if there's at least a message that is scheduled to be delivered already + */ + boolean hasMessageAvailable(); + + /** + * @return the number of delayed messages being tracked + */ + long getNumberOfDelayedMessages(); + + /** + * Get a set of position of messages that have already reached the delivery time + */ + Set getScheduledMessages(int maxMessages); + + /** + * Close the subscription tracker and release all resources. + */ + void close(); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactory.java new file mode 100644 index 0000000000000..b0a16a571e10f --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import com.google.common.annotations.Beta; + +import java.io.IOException; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; + +/** + * Factory of InMemoryDelayedDeliveryTracker objects. This is the entry point for implementations. + * + * Note: this interface is still being refined and some breaking changes might be introduced. + */ +@Beta +public interface DelayedDeliveryTrackerFactory extends AutoCloseable { + /** + * Initialize the factory implementation from the broker service configuration + * + * @param config + * the broker service config object + */ + void initialize(ServiceConfiguration config) throws IOException; + + /** + * Create a new tracker instance. + * + * @param dispatcher + * a multi-consumer dispatcher instance + */ + DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher); + + /** + * Close the factory and release all the resources + */ + void close() throws IOException; +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java new file mode 100644 index 0000000000000..90b444d2d5d4a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; + +import lombok.experimental.UtilityClass; + +import org.apache.pulsar.broker.ServiceConfiguration; + +@UtilityClass +public class DelayedDeliveryTrackerLoader { + public static DelayedDeliveryTrackerFactory loadDelayedDeliveryTrackerFactory(ServiceConfiguration conf) + throws IOException { + Class factoryClass; + try { + factoryClass = Class.forName(conf.getDelayedDeliveryTrackerFactoryClassName()); + Object obj = factoryClass.newInstance(); + checkArgument(obj instanceof DelayedDeliveryTrackerFactory, + "The factory has to be an instance of " + DelayedDeliveryTrackerFactory.class.getName()); + + DelayedDeliveryTrackerFactory factory = (DelayedDeliveryTrackerFactory) obj; + factory.initialize(conf); + return factory; + } catch (Exception e) { + throw new IOException(e); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java new file mode 100644 index 0000000000000..c692c3ac77ae7 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; + +import java.time.Clock; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; + +@Slf4j +public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask { + + private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue(); + + private final PersistentDispatcherMultipleConsumers dispatcher; + + // Reference to the shared (per-broker) timer for delayed delivery + private final Timer timer; + + // Current timeout or null if not set + private Timeout timeout; + + // Timestamp at which the timeout is currently set + private long currentTimeoutTarget; + + private final long tickTimeMillis; + + private final Clock clock; + + InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis) { + this(dispatcher, timer, tickTimeMillis, Clock.systemUTC()); + } + + InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, Clock clock) { + this.dispatcher = dispatcher; + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.clock = clock; + } + + @Override + public boolean addMessage(long ledgerId, long entryId, long deliveryAt) { + long now = clock.millis(); + if (log.isDebugEnabled()) { + log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId, + deliveryAt - now); + } + if (deliveryAt < now) { + // It's already about time to deliver this message + return false; + } + + priorityQueue.add(deliveryAt, ledgerId, entryId); + updateTimer(); + return true; + } + + /** + * Return true if there's at least a message that is scheduled to be delivered already + */ + @Override + public boolean hasMessageAvailable() { + return !priorityQueue.isEmpty() && priorityQueue.peekN1() <= clock.millis(); + } + + /** + * Get a set of position of messages that have already reached + */ + @Override + public Set getScheduledMessages(int maxMessages) { + int n = maxMessages; + Set positions = new TreeSet<>(); + long now = clock.millis(); + // Pick all the messages that will be ready within the tick time period. + // This is to avoid keeping rescheduling the timer for each message at + // very short delay + long cutoffTime = now + tickTimeMillis; + + while (n > 0 && !priorityQueue.isEmpty()) { + long timestamp = priorityQueue.peekN1(); + if (timestamp > cutoffTime) { + break; + } + + long ledgerId = priorityQueue.peekN2(); + long entryId = priorityQueue.peekN3(); + positions.add(new PositionImpl(ledgerId, entryId)); + + priorityQueue.pop(); + --n; + } + + if (log.isDebugEnabled()) { + log.debug("[{}] Get scheduled messags - found {}", dispatcher.getName(), positions.size()); + } + updateTimer(); + return positions; + } + + @Override + public long getNumberOfDelayedMessages() { + return priorityQueue.size(); + } + + private void updateTimer() { + if (priorityQueue.isEmpty()) { + if (timeout != null) { + currentTimeoutTarget = -1; + timeout.cancel(); + timeout = null; + } + return; + } + + long timestamp = priorityQueue.peekN1(); + if (timestamp == currentTimeoutTarget) { + // The timer is already set to the correct target time + return; + } + + if (timeout != null) { + timeout.cancel(); + } + + long delayMillis = timestamp - clock.millis(); + if (log.isDebugEnabled()) { + log.debug("[{}] Start timer in {} millis", dispatcher.getName(), delayMillis); + } + + if (delayMillis < 0) { + // There are messages that are already ready to be delivered. If + // the dispatcher is not getting them is because the consumer is + // either not connected or slow. + // We don't need to keep retriggering the timer. When the consumer + // catches up, the dispatcher will do the readMoreEntries() and + // get these messages + return; + } + + currentTimeoutTarget = timestamp; + timeout = timer.newTimeout(this, delayMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void run(Timeout timeout) throws Exception { + if (log.isDebugEnabled()) { + log.info("[{}] Timer triggered", dispatcher.getName()); + } + if (timeout.isCancelled()) { + return; + } + + synchronized (dispatcher) { + currentTimeoutTarget = -1; + timeout = null; + dispatcher.readMoreEntries(); + } + } + + @Override + public void close() { + priorityQueue.close(); + if (timeout != null) { + timeout.cancel(); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java new file mode 100644 index 0000000000000..71f0fc293d2e1 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; +import io.netty.util.concurrent.DefaultThreadFactory; + +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; + +public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory { + + private Timer timer; + + private long tickTimeMillis; + + @Override + public void initialize(ServiceConfiguration config) { + this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"), + config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS); + this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis(); + } + + @Override + public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) { + return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis); + } + + @Override + public void close() { + if (timer != null) { + timer.stop(); + } + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 06f57530e3596..92ce2b0d7af09 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -31,7 +31,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; -public abstract class AbstractBaseDispatcher { +public abstract class AbstractBaseDispatcher implements Dispatcher { protected final Subscription subscription; @@ -80,6 +80,12 @@ public void filterEntriesForConsumer(List entries, EntryBatchSizes batchS subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual, Collections.emptyMap()); continue; + } else if (msgMetadata.hasDeliverAtTime() + && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) { + // The message is marked for delayed delivery. Ignore for now. + entries.set(i, null); + entry.release(); + continue; } int batchSize = msgMetadata.getNumMessagesInBatch(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 1890efadff51d..c1556fa07f706 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -80,6 +80,8 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; @@ -189,6 +191,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener blockedDispatchers; private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory; + public BrokerService(PulsarService pulsar) throws Exception { this.pulsar = pulsar; this.managedLedgerFactory = pulsar.getManagedLedgerFactory(); @@ -271,6 +275,9 @@ public void recordLatency(EventType eventType, long latencyMs) { pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs); } }; + + this.delayedDeliveryTrackerFactory = DelayedDeliveryTrackerLoader + .loadDelayedDeliveryTrackerFactory(pulsar.getConfiguration()); } public void start() throws Exception { @@ -413,6 +420,7 @@ public void close() throws IOException { ClientCnxnAspect.removeListener(zkStatsListener); ClientCnxnAspect.registerExecutor(null); topicOrderedExecutor.shutdown(); + delayedDeliveryTrackerFactory.close(); log.info("Broker service completely shut down"); } @@ -1397,6 +1405,10 @@ public void onUpdate(String path, Map data, Stat stat) { } + public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() { + return delayedDeliveryTrackerFactory; + } + public static List getDynamicConfiguration() { return dynamicConfigurationMap.keys(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index f9271c750bbbf..3f839b17197c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.utils.CopyOnWriteArrayList; @@ -82,4 +83,16 @@ default Optional getRateLimiter() { default void initializeDispatchRateLimiterIfNeeded(Optional policies) { //No-op } + + /** + * Check with dispatcher if the message should be added to the delayed delivery tracker. + * Return true if the message should be delayed and ignored at this point. + */ + default boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { + return false; + } + + default long getNumberOfDelayedMessages() { + return 0; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index cf4c8627de22c..345ef0b0717c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -50,6 +50,10 @@ public interface Subscription { long getNumberOfEntriesInBacklog(); + default long getNumberOfEntriesDelayed() { + return 0; + } + List getConsumers(); CompletableFuture close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 060b45d9f333e..fad8343bd9084 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -41,6 +41,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; @@ -55,6 +56,7 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.Codec; @@ -72,9 +74,12 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu protected final ManagedCursor cursor; private CompletableFuture closeFuture = null; - LongPairSet messagesToReplay = new ConcurrentSortedLongPairSet(128, 2); + LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); protected final RedeliveryTracker redeliveryTracker; + private Optional delayedDeliveryTracker = Optional.empty(); + private final boolean isDelayedDeliveryEnabled; + private boolean havePendingRead = false; private boolean havePendingReplayRead = false; private boolean shouldRewindBeforeReadingOrReplaying = false; @@ -109,6 +114,8 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration() .getMaxUnackedMessagesPerSubscription(); + this.isDelayedDeliveryEnabled = topic.getBrokerService().pulsar().getConfiguration() + .isDelayedDeliveryEnabled(); this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); } @@ -127,7 +134,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce cursor.rewind(); shouldRewindBeforeReadingOrReplaying = false; } - messagesToReplay.clear(); + messagesToRedeliver.clear(); } if (isConsumersExceededOnTopic()) { @@ -193,7 +200,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE havePendingRead = false; } - messagesToReplay.clear(); + messagesToRedeliver.clear(); if (closeFuture != null) { log.info("[{}] All consumers removed. Subscription is disconnected", name); closeFuture.complete(null); @@ -204,7 +211,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE log.debug("[{}] Consumer are left, reading more entries", name); } consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - messagesToReplay.add(ledgerId, entryId); + messagesToRedeliver.add(ledgerId, entryId); }); totalAvailablePermits -= consumer.getAvailablePermits(); readMoreEntries(); @@ -282,14 +289,13 @@ public void readMoreEntries() { } - if (!messagesToReplay.isEmpty()) { + if (hasMessagesToReplay()) { if (havePendingReplayRead) { log.debug("[{}] Skipping replay while awaiting previous read to complete", name); return; } - Set messagesToReplayNow = messagesToReplay.items(messagesToRead, - (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); + Set messagesToReplayNow = getMessagesToReplayNow(messagesToRead); if (log.isDebugEnabled()) { log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(), @@ -301,9 +307,9 @@ public void readMoreEntries() { ReadType.Replay); // clear already acked positions from replay bucket - deletedMessages.forEach(position -> messagesToReplay.remove(((PositionImpl) position).getLedgerId(), + deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId())); - // if all the entries are acked-entries and cleared up from messagesToReplay, try to read + // if all the entries are acked-entries and cleared up from messagesToRedeliver, try to read // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; @@ -348,6 +354,16 @@ public synchronized boolean canUnsubscribe(Consumer consumer) { @Override public CompletableFuture close() { IS_CLOSED_UPDATER.set(this, TRUE); + + Optional delayedDeliveryTracker; + synchronized (this) { + delayedDeliveryTracker = this.delayedDeliveryTracker; + this.delayedDeliveryTracker = Optional.empty(); + } + + if (delayedDeliveryTracker.isPresent()) { + delayedDeliveryTracker.get().close(); + } return disconnectAllConsumers(); } @@ -438,7 +454,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { // remove positions first from replay list first : sendMessages recycles entries if (readType == ReadType.Replay) { entries.subList(start, start + messagesForC).forEach(entry -> { - messagesToReplay.remove(entry.getLedgerId(), entry.getEntryId()); + messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()); }); } @@ -477,7 +493,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { entries.size() - start); } entries.subList(start, entries.size()).forEach(entry -> { - messagesToReplay.add(entry.getLedgerId(), entry.getEntryId()); + messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId()); entry.release(); }); } @@ -517,7 +533,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj if (exception instanceof ManagedLedgerException.InvalidReplayPositionException) { PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition(); - messagesToReplay.removeIf((ledgerId, entryId) -> { + messagesToRedeliver.removeIf((ledgerId, entryId) -> { return ComparisonChain.start().compare(ledgerId, markDeletePosition.getLedgerId()) .compare(entryId, markDeletePosition.getEntryId()).result() <= 0; }); @@ -566,11 +582,11 @@ public boolean isConsumerAvailable(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - messagesToReplay.add(ledgerId, entryId); + messagesToRedeliver.add(ledgerId, entryId); }); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, - messagesToReplay); + messagesToRedeliver); } readMoreEntries(); } @@ -578,7 +594,7 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { positions.forEach(position -> { - messagesToReplay.add(position.getLedgerId(), position.getEntryId()); + messagesToRedeliver.add(position.getLedgerId(), position.getEntryId()); redeliveryTracker.incrementAndGetRedeliveryCount(position); }); if (log.isDebugEnabled()) { @@ -658,5 +674,58 @@ public void initializeDispatchRateLimiterIfNeeded(Optional policies) { } } + @Override + public synchronized boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { + if (!isDelayedDeliveryEnabled) { + // If broker has the feature disabled, always deliver messages immediately + return false; + } + + if (!delayedDeliveryTracker.isPresent()) { + // Initialize the tracker the first time we need to use it + delayedDeliveryTracker = Optional.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); + } + + return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime()); + } + + /** + * Returns whether we have any message that could be immediately replayed. + * This could be a message that was requested to be re-delivered or a delayed + * delivery. + */ + private boolean hasMessagesToReplay() { + if (!messagesToRedeliver.isEmpty()) { + return true; + } + + if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { + return true; + } + + return false; + } + + private synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { + if (!messagesToRedeliver.isEmpty()) { + return messagesToRedeliver.items(maxMessagesToRead, + (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); + } else { + return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); + } + } + + public synchronized long getNumberOfDelayedMessages() { + if (delayedDeliveryTracker.isPresent()) { + return delayedDeliveryTracker.get().getNumberOfDelayedMessages(); + } else { + return 0; + } + } + + public PersistentTopic getTopic() { + return topic; + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index a49b41e17e2ad..ca4f52587385b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -102,7 +102,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { // remove positions first from replay list first : sendMessages recycles entries List subList = new ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC)); if (readType == ReadType.Replay) { - subList.forEach(entry -> messagesToReplay.remove(entry.getLedgerId(), entry.getEntryId())); + subList.forEach(entry -> messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId())); } SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); @@ -143,7 +143,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { for (List entryList : groupedEntries.values()) { laterReplay += entryList.size(); entryList.forEach(entry -> { - messagesToReplay.add(entry.getLedgerId(), entry.getEntryId()); + messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId()); entry.release(); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 282d6e31261c7..8cb97c5b1d6bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -672,10 +672,10 @@ public SubscriptionStats getStats() { } if (SubType.Shared.equals(subStats.type)) { if (dispatcher instanceof PersistentDispatcherMultipleConsumers) { - subStats.unackedMessages = ((PersistentDispatcherMultipleConsumers) dispatcher) - .getTotalUnackedMessages(); - subStats.blockedSubscriptionOnUnackedMsgs = ((PersistentDispatcherMultipleConsumers) dispatcher) - .isBlockedDispatcherOnUnackedMsgs(); + PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers) dispatcher; + subStats.unackedMessages = d.getTotalUnackedMessages(); + subStats.blockedSubscriptionOnUnackedMsgs = d.isBlockedDispatcherOnUnackedMsgs(); + subStats.msgDelayed = d.getNumberOfDelayedMessages(); } } subStats.msgBacklog = getNumberOfEntriesInBacklog(); @@ -699,6 +699,15 @@ public void addUnAckedMessages(int unAckMessages) { dispatcher.addUnAckedMessages(unAckMessages); } + @Override + public synchronized long getNumberOfEntriesDelayed() { + if (dispatcher != null) { + return dispatcher.getNumberOfDelayedMessages(); + } else { + return 0; + } + } + @Override public void markTopicWithBatchMessagePublished() { topic.markBatchMessagePublished(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index b2a343a37ff42..272cbb4c9e577 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -91,6 +91,7 @@ void updateStats(TopicStats stats) { subscriptionStats.computeIfAbsent(n, k -> new AggregatedSubscriptionStats()); subsStats.blockedSubscriptionOnUnackedMsgs = as.blockedSubscriptionOnUnackedMsgs; subsStats.msgBacklog += as.msgBacklog; + subsStats.msgDelayed += as.msgDelayed; subsStats.msgRateRedeliver += as.msgRateRedeliver; subsStats.unackedMessages += as.unackedMessages; as.consumerStat.forEach((c, v) -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java index c46bbf5a2be44..1f3c51302e980 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java @@ -37,5 +37,7 @@ public class AggregatedSubscriptionStats { public double msgThroughputOut; + public long msgDelayed; + public Map consumerStat = new HashMap<>(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index bf04b195e4f39..89f4a2c908e8a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -125,6 +125,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include AggregatedSubscriptionStats subsStats = stats.subscriptionStats .computeIfAbsent(name, k -> new AggregatedSubscriptionStats()); subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog(); + subsStats.msgDelayed = subscription.getNumberOfEntriesDelayed(); subscription.getConsumers().forEach(consumer -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 7f65a3da73c01..17ab714e7dbf6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -122,6 +122,7 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin stats.subscriptionStats.forEach((n, subsStats) -> { metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log", subsStats.msgBacklog); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed", subsStats.msgDelayed); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver", subsStats.msgRateRedeliver); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_massages", subsStats.unackedMessages); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java new file mode 100644 index 0000000000000..fbfc4d2d8b1aa --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; + +import java.time.Clock; +import java.util.Collections; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import lombok.Cleanup; + +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.testng.annotations.Test; + +public class InMemoryDeliveryTrackerTest { + + @Test + public void test() throws Exception { + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + + Timer timer = mock(Timer.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + @Cleanup + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock); + + assertEquals(tracker.hasMessageAvailable(), false); + + assertTrue(tracker.addMessage(2, 2, 20)); + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(3, 3, 30)); + assertTrue(tracker.addMessage(5, 5, 50)); + assertTrue(tracker.addMessage(4, 4, 40)); + + assertEquals(tracker.hasMessageAvailable(), false); + assertEquals(tracker.getNumberOfDelayedMessages(), 5); + + assertEquals(tracker.getScheduledMessages(10), Collections.emptySet()); + + // Move time forward + clockTime.set(15); + + // Message is rejected by tracker since it's already ready to send + assertFalse(tracker.addMessage(6, 6, 10)); + + assertEquals(tracker.getNumberOfDelayedMessages(), 5); + assertEquals(tracker.hasMessageAvailable(), true); + Set scheduled = tracker.getScheduledMessages(10); + assertEquals(scheduled.size(), 1); + + // Move time forward + clockTime.set(60); + + assertEquals(tracker.getNumberOfDelayedMessages(), 4); + assertEquals(tracker.hasMessageAvailable(), true); + scheduled = tracker.getScheduledMessages(1); + assertEquals(scheduled.size(), 1); + + assertEquals(tracker.getNumberOfDelayedMessages(), 3); + assertEquals(tracker.hasMessageAvailable(), true); + scheduled = tracker.getScheduledMessages(3); + assertEquals(scheduled.size(), 3); + + assertEquals(tracker.getNumberOfDelayedMessages(), 0); + assertEquals(tracker.hasMessageAvailable(), false); + assertEquals(tracker.getScheduledMessages(10), Collections.emptySet()); + } + + @Test + public void testWithTimer() throws Exception { + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + Timer timer = mock(Timer.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + NavigableMap tasks = new TreeMap<>(); + + when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> { + TimerTask task = invocation.getArgumentAt(0, TimerTask.class); + long timeout = invocation.getArgumentAt(1, Long.class); + TimeUnit unit = invocation.getArgumentAt(2, TimeUnit.class); + long scheduleAt = clockTime.get() + unit.toMillis(timeout); + tasks.put(scheduleAt, task); + + Timeout t = mock(Timeout.class); + when(t.cancel()).then(i -> { + tasks.remove(scheduleAt, task); + return null; + }); + return t; + }); + + @Cleanup + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock); + + assertTrue(tasks.isEmpty()); + assertTrue(tracker.addMessage(2, 2, 20)); + assertEquals(tasks.size(), 1); + assertEquals(tasks.firstKey().longValue(), 20); + + assertTrue(tracker.addMessage(1, 1, 10)); + assertEquals(tasks.size(), 1); + assertEquals(tasks.firstKey().longValue(), 10); + + assertTrue(tracker.addMessage(3, 3, 30)); + assertEquals(tasks.size(), 1); + assertEquals(tasks.firstKey().longValue(), 10); + + clockTime.set(15); + + TimerTask task = tasks.pollFirstEntry().getValue(); + Timeout cancelledTimeout = mock(Timeout.class); + when(cancelledTimeout.isCancelled()).thenReturn(true); + task.run(cancelledTimeout); + verifyZeroInteractions(dispatcher); + + task.run(mock(Timeout.class)); + verify(dispatcher).readMoreEntries(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 8aaee86e3d72f..da42cdb78e1c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1384,7 +1384,7 @@ public void testMessageReplay() throws Exception { PersistentSubscription subRef = topicRef.getSubscription(subName); PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef .getDispatcher(); - Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay"); + Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToRedeliver"); replayMap.setAccessible(true); ConcurrentLongPairSet messagesToReplay = new ConcurrentLongPairSet(64, 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java new file mode 100644 index 0000000000000..de8c6a61be900 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import lombok.Cleanup; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class DelayedDeliveryTest extends ProducerConsumerBase { + + @Override + @BeforeClass + public void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + @AfterClass + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testDelayedDelivery() + throws Exception { + String topic = "testNegativeAcks-" + System.nanoTime(); + + @Cleanup + Consumer failoverConsumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("failover-sub") + .subscriptionType(SubscriptionType.Failover) + .subscribe(); + + @Cleanup + Consumer sharedConsumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("shared-sub") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + for (int i = 0; i < 10; i++) { + producer.newMessage() + .value("msg-" + i) + .deliverAfter(5, TimeUnit.SECONDS) + .sendAsync(); + } + + producer.flush(); + + // Failover consumer will receive the messages immediately while + // the shared consumer will get them after the delay + Message msg = sharedConsumer.receive(100, TimeUnit.MILLISECONDS); + assertEquals(msg, null); + + for (int i = 0; i < 10; i++) { + msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS); + assertEquals(msg.getValue(), "msg-" + i); + } + + Set receivedMsgs = new TreeSet<>(); + for (int i = 0; i < 10; i++) { + msg = sharedConsumer.receive(10, TimeUnit.SECONDS); + receivedMsgs.add(msg.getValue()); + } + + assertEquals(receivedMsgs.size(), 10); + for (int i = 0; i < 10; i++) { + assertTrue(receivedMsgs.contains("msg-" + i)); + } + } + + @Test + public void testInterleavedMessages() + throws Exception { + String topic = "testInterleavedMessages-" + System.nanoTime(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("shared-sub") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + for (int i = 0; i < 10; i++) { + // Publish 1 message without delay and 1 with delay + producer.newMessage() + .value("immediate-msg-" + i) + .sendAsync(); + + producer.newMessage() + .value("delayed-msg-" + i) + .deliverAfter(5, TimeUnit.SECONDS) + .sendAsync(); + } + + producer.flush(); + + // Failover consumer will receive the messages immediately while + // the shared consumer will get them after the delay + for (int i = 0; i < 10; i++) { + Message msg = consumer.receive(100, TimeUnit.MILLISECONDS); + assertEquals(msg.getValue(), "immediate-msg-" + i); + consumer.acknowledge(msg); + } + + // Delayed messages might not come in same exact order + Set delayedMessages = new TreeSet<>(); + for (int i = 0; i < 10; i++) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + delayedMessages.add(msg.getValue()); + consumer.acknowledge(msg); + } + + for (int i = 0; i < 10; i++) { + assertTrue(delayedMessages.contains("delayed-msg-" + i)); + } + } + + @Test + public void testEverythingFilteredInMultipleReads() + throws Exception { + String topic = "testEverythingFilteredInMultipleReads-" + System.nanoTime(); + + @Cleanup + Consumer sharedConsumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("shared-sub") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + for (int i = 0; i < 10; i++) { + producer.newMessage() + .value("msg-" + i) + .deliverAfter(5, TimeUnit.SECONDS) + .send(); + } + + Thread.sleep(1000); + + // Write a 2nd batch of messages + for (int i = 10; i < 20; i++) { + producer.newMessage() + .value("msg-" + i) + .deliverAfter(5, TimeUnit.SECONDS) + .send(); + } + + Message msg = sharedConsumer.receive(100, TimeUnit.MILLISECONDS); + assertEquals(msg, null); + + Set receivedMsgs = new TreeSet<>(); + for (int i = 0; i < 20; i++) { + msg = sharedConsumer.receive(10, TimeUnit.SECONDS); + receivedMsgs.add(msg.getValue()); + } + + assertEquals(receivedMsgs.size(), 20); + for (int i = 0; i < 10; i++) { + assertTrue(receivedMsgs.contains("msg-" + i)); + } + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java index 824a9678fb1b7..7dd688d9b37c7 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * Message builder that constructs a message to be published through a producer. @@ -184,6 +185,36 @@ public interface TypedMessageBuilder extends Serializable { */ TypedMessageBuilder disableReplication(); + /** + * Deliver the message only at or after the specified absolute timestamp. + *

+ * The timestamp is milliseconds and based on UTC (eg: {@link System#currentTimeMillis()}. + *

+ * Note: messages are only delivered with delay when a consumer is consuming + * through a {@link SubscriptionType#Shared} subscription. With other subscription + * types, the messages will still be delivered immediately. + * + * @param timestamp + * absolute timestamp indicating when the message should be delivered to consumers + * @return the message builder instance + */ + TypedMessageBuilder deliverAt(long timestamp); + + /** + * Request to deliver the message only after the specified relative delay. + *

+ * Note: messages are only delivered with delay when a consumer is consuming + * through a {@link SubscriptionType#Shared} subscription. With other subscription + * types, the messages will still be delivered immediately. + * + * @param delay + * the amount of delay before the message will be delivered + * @param unit + * the time unit for the delay + * @return the message builder instance + */ + TypedMessageBuilder deliverAfter(long delay, TimeUnit unit); + /** * Configure the {@link TypedMessageBuilder} from a config map, as an alternative compared * to call the individual builder methods. @@ -212,6 +243,8 @@ public interface TypedMessageBuilder extends Serializable { * {@link #CONF_SEQUENCE_ID}{@code sequenceId}{@code long}{@link #sequenceId(long)} * {@link #CONF_REPLICATION_CLUSTERS}{@code replicationClusters}{@code List}{@link #replicationClusters(List)} * {@link #CONF_DISABLE_REPLICATION}{@code disableReplication}{@code boolean}{@link #disableReplication()} + * {@link #CONF_DELIVERY_AFTER_SECONDS}{@code deliverAfterSeconds}{@code long}{@link #deliverAfter(long, TimeUnit)} + * {@link #CONF_DELIVERY_AT}{@code deliverAt}{@code long}{@link #deliverAt(long)} * * * @param config a map with the configuration options for the message @@ -225,4 +258,6 @@ public interface TypedMessageBuilder extends Serializable { static final String CONF_SEQUENCE_ID = "sequenceId"; static final String CONF_REPLICATION_CLUSTERS = "replicationClusters"; static final String CONF_DISABLE_REPLICATION = "disableReplication"; + static final String CONF_DELIVERY_AFTER_SECONDS = "deliverAfterSeconds"; + static final String CONF_DELIVERY_AT = "deliverAt"; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 3e32fb91d2e3e..fecdc1f2bd24e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -304,8 +304,9 @@ public void sendAsync(Message message, SendCallback callback) { // If compression is enabled, we are compressing, otherwise it will simply use the same buffer int uncompressedSize = payload.readableBytes(); ByteBuf compressedPayload = payload; - // batch will be compressed when closed - if (!isBatchMessagingEnabled()) { + // Batch will be compressed when closed + // If a message has a delayed delivery time, we'll always send it individually + if (!isBatchMessagingEnabled() || msgMetadataBuilder.hasDeliverAtTime()) { compressedPayload = compressor.encode(payload); payload.release(); @@ -359,7 +360,7 @@ public void sendAsync(Message message, SendCallback callback) { msgMetadataBuilder.setUncompressedSize(uncompressedSize); } - if (isBatchMessagingEnabled()) { + if (isBatchMessagingEnabled() && !msgMetadataBuilder.hasDeliverAtTime()) { // handle boundary cases where message being added would exceed // batch size and/or max message size if (batchMessageContainer.hasSpaceInBatch(msg)) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index e7404ab25048d..da4e193228c26 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -165,6 +166,17 @@ public TypedMessageBuilder disableReplication() { return this; } + @Override + public TypedMessageBuilder deliverAfter(long delay, TimeUnit unit) { + return deliverAt(System.currentTimeMillis() + unit.toMillis(delay)); + } + + @Override + public TypedMessageBuilder deliverAt(long timestamp) { + msgMetadataBuilder.setDeliverAtTime(timestamp); + return this; + } + @SuppressWarnings("unchecked") @Override public TypedMessageBuilder loadConf(Map config) { @@ -184,6 +196,10 @@ public TypedMessageBuilder loadConf(Map config) { if (disableReplication) { this.disableReplication(); } + } else if (key.equals(CONF_DELIVERY_AFTER_SECONDS)) { + this.deliverAfter(checkType(value, Long.class), TimeUnit.SECONDS); + } else if (key.equals(CONF_DELIVERY_AT)) { + this.deliverAt(checkType(value, Long.class)); } else { throw new RuntimeException("Invalid message config key '" + key + "'"); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index ffe15673cce5c..3dcc367bf1859 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -3096,6 +3096,10 @@ public interface MessageMetadataOrBuilder boolean hasOrderingKey(); org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getOrderingKey(); + // optional int64 deliver_at_time = 19; + boolean hasDeliverAtTime(); + long getDeliverAtTime(); + // optional int32 marker_type = 20; boolean hasMarkerType(); int getMarkerType(); @@ -3419,11 +3423,21 @@ public org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getOrderingK return orderingKey_; } + // optional int64 deliver_at_time = 19; + public static final int DELIVER_AT_TIME_FIELD_NUMBER = 19; + private long deliverAtTime_; + public boolean hasDeliverAtTime() { + return ((bitField0_ & 0x00004000) == 0x00004000); + } + public long getDeliverAtTime() { + return deliverAtTime_; + } + // optional int32 marker_type = 20; public static final int MARKER_TYPE_FIELD_NUMBER = 20; private int markerType_; public boolean hasMarkerType() { - return ((bitField0_ & 0x00004000) == 0x00004000); + return ((bitField0_ & 0x00008000) == 0x00008000); } public int getMarkerType() { return markerType_; @@ -3447,6 +3461,7 @@ private void initFields() { schemaVersion_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY; partitionKeyB64Encoded_ = false; orderingKey_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY; + deliverAtTime_ = 0L; markerType_ = 0; } private byte memoizedIsInitialized = -1; @@ -3542,6 +3557,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr output.writeBytes(18, orderingKey_); } if (((bitField0_ & 0x00004000) == 0x00004000)) { + output.writeInt64(19, deliverAtTime_); + } + if (((bitField0_ & 0x00008000) == 0x00008000)) { output.writeInt32(20, markerType_); } } @@ -3626,6 +3644,10 @@ public int getSerializedSize() { .computeBytesSize(18, orderingKey_); } if (((bitField0_ & 0x00004000) == 0x00004000)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeInt64Size(19, deliverAtTime_); + } + if (((bitField0_ & 0x00008000) == 0x00008000)) { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeInt32Size(20, markerType_); } @@ -3776,8 +3798,10 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00008000); orderingKey_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00010000); - markerType_ = 0; + deliverAtTime_ = 0L; bitField0_ = (bitField0_ & ~0x00020000); + markerType_ = 0; + bitField0_ = (bitField0_ & ~0x00040000); return this; } @@ -3886,6 +3910,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata buildPartial if (((from_bitField0_ & 0x00020000) == 0x00020000)) { to_bitField0_ |= 0x00004000; } + result.deliverAtTime_ = deliverAtTime_; + if (((from_bitField0_ & 0x00040000) == 0x00040000)) { + to_bitField0_ |= 0x00008000; + } result.markerType_ = markerType_; result.bitField0_ = to_bitField0_; return result; @@ -3965,6 +3993,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.MessageMet if (other.hasOrderingKey()) { setOrderingKey(other.getOrderingKey()); } + if (other.hasDeliverAtTime()) { + setDeliverAtTime(other.getDeliverAtTime()); + } if (other.hasMarkerType()) { setMarkerType(other.getMarkerType()); } @@ -4112,8 +4143,13 @@ public Builder mergeFrom( orderingKey_ = input.readBytes(); break; } - case 160: { + case 152: { bitField0_ |= 0x00020000; + deliverAtTime_ = input.readInt64(); + break; + } + case 160: { + bitField0_ |= 0x00040000; markerType_ = input.readInt32(); break; } @@ -4723,22 +4759,43 @@ public Builder clearOrderingKey() { return this; } + // optional int64 deliver_at_time = 19; + private long deliverAtTime_ ; + public boolean hasDeliverAtTime() { + return ((bitField0_ & 0x00020000) == 0x00020000); + } + public long getDeliverAtTime() { + return deliverAtTime_; + } + public Builder setDeliverAtTime(long value) { + bitField0_ |= 0x00020000; + deliverAtTime_ = value; + + return this; + } + public Builder clearDeliverAtTime() { + bitField0_ = (bitField0_ & ~0x00020000); + deliverAtTime_ = 0L; + + return this; + } + // optional int32 marker_type = 20; private int markerType_ ; public boolean hasMarkerType() { - return ((bitField0_ & 0x00020000) == 0x00020000); + return ((bitField0_ & 0x00040000) == 0x00040000); } public int getMarkerType() { return markerType_; } public Builder setMarkerType(int value) { - bitField0_ |= 0x00020000; + bitField0_ |= 0x00040000; markerType_ = value; return this; } public Builder clearMarkerType() { - bitField0_ = (bitField0_ & ~0x00020000); + bitField0_ = (bitField0_ & ~0x00040000); markerType_ = 0; return this; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index 40e3dadc2d7c2..7cdf514ebc8b2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -44,6 +44,9 @@ public class SubscriptionStats { /** Flag to verify if subscription is blocked due to reaching threshold of unacked messages */ public boolean blockedSubscriptionOnUnackedMsgs; + /** Number of delayed messages currently being tracked */ + public long msgDelayed; + /** Number of unacknowledged messages for the subscription */ public long unackedMessages; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java new file mode 100644 index 0000000000000..9d4b88e842c6a --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import static com.google.common.base.Preconditions.checkArgument; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + +/** + * Provides a priority-queue implementation specialized on items composed by 3 longs. + * + * This class is not thread safe and the items are stored in direct memory. + */ +public class TripleLongPriorityQueue implements AutoCloseable { + + private static final int SIZE_OF_LONG = 8; + private static final int DEFAULT_INITIAL_CAPACITY = 16; + + // Each item is composed of 3 longs + private static final int ITEMS_COUNT = 3; + + private static final int TUPLE_SIZE = ITEMS_COUNT * SIZE_OF_LONG; + + private final ByteBuf buffer; + + private int capacity; + private int size; + + /** + * Create a new priority queue with default initial capacity + */ + public TripleLongPriorityQueue() { + this(DEFAULT_INITIAL_CAPACITY); + } + + /** + * Create a new priority queue with a given initial capacity + * @param initialCapacity + */ + public TripleLongPriorityQueue(int initialCapacity) { + capacity = initialCapacity; + buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * ITEMS_COUNT * SIZE_OF_LONG); + size = 0; + } + + /** + * Close the priority queue and free the memory associated + */ + @Override + public void close() { + buffer.release(); + } + + /** + * Add a tuple of 3 long items to the priority queue + * + * @param n1 + * @param n2 + * @param n3 + */ + public void add(long n1, long n2, long n3) { + if (size == capacity) { + increaseCapacity(); + } + + put(size, n1, n2, n3); + siftUp(size); + ++size; + } + + /** + * Read the 1st long item in the top tuple in the priority queue. + *

+ * The tuple will not be extracted + */ + public long peekN1() { + checkArgument(size != 0); + return buffer.getLong(0); + } + + /** + * Read the 2nd long item in the top tuple in the priority queue. + *

+ * The tuple will not be extracted + */ + public long peekN2() { + checkArgument(size != 0); + return buffer.getLong(0 + 1 * SIZE_OF_LONG); + } + + /** + * Read the 3rd long item in the top tuple in the priority queue. + *

+ * The tuple will not be extracted + */ + public long peekN3() { + checkArgument(size != 0); + return buffer.getLong(0 + 2 * SIZE_OF_LONG); + } + + /** + * Removes the first item from the queue. + */ + public void pop() { + checkArgument(size != 0); + swap(0, size - 1); + size--; + siftDown(0); + } + + /** + * Returns whether the priority queue is empty + */ + public boolean isEmpty() { + return size == 0; + } + + /** + * Returns the number of tuples in the priority queue + */ + public int size() { + return size; + } + + private void increaseCapacity() { + // For bigger sizes, increase by 50% + this.capacity += (capacity <= 256 ? capacity : capacity / 2); + buffer.capacity(this.capacity * TUPLE_SIZE); + } + + private void siftUp(int idx) { + while (idx > 0) { + int parentIdx = (idx - 1) / 2; + if (compare(idx, parentIdx) >= 0) { + break; + } + + swap(idx, parentIdx); + idx = parentIdx; + } + } + + private void siftDown(int idx) { + int half = size / 2; + while (idx < half) { + int left = 2 * idx + 1; + int right = 2 * idx + 2; + + int swapIdx = idx; + + if (compare(idx, left) > 0) { + swapIdx = left; + } + + if (right < size && compare(swapIdx, right) > 0) { + swapIdx = right; + } + + if (swapIdx == idx) { + return; + } + + swap(idx, swapIdx); + idx = swapIdx; + } + } + + private void put(int idx, long n1, long n2, long n3) { + int i = idx * TUPLE_SIZE; + buffer.setLong(i, n1); + buffer.setLong(i + 1 * SIZE_OF_LONG, n2); + buffer.setLong(i + 2 * SIZE_OF_LONG, n3); + } + + private int compare(int idx1, int idx2) { + int i1 = idx1 * TUPLE_SIZE; + int i2 = idx2 * TUPLE_SIZE; + + int c1 = Long.compare(buffer.getLong(i1), buffer.getLong(i2)); + if (c1 != 0) { + return c1; + } + + int c2 = Long.compare(buffer.getLong(i1 + SIZE_OF_LONG), buffer.getLong(i2 + SIZE_OF_LONG)); + if (c2 != 0) { + return c2; + } + + return Long.compare(buffer.getLong(i1 + 2 * SIZE_OF_LONG), buffer.getLong(i2 + 2 * SIZE_OF_LONG)); + } + + private void swap(int idx1, int idx2) { + int i1 = idx1 * TUPLE_SIZE; + int i2 = idx2 * TUPLE_SIZE; + + long tmp1 = buffer.getLong(i1); + long tmp2 = buffer.getLong(i1 + 1 * SIZE_OF_LONG); + long tmp3 = buffer.getLong(i1 + 2 * SIZE_OF_LONG); + + buffer.setLong(i1, buffer.getLong(i2)); + buffer.setLong(i1 + 1 * SIZE_OF_LONG, buffer.getLong(i2 + 1 * SIZE_OF_LONG)); + buffer.setLong(i1 + 2 * SIZE_OF_LONG, buffer.getLong(i2 + 2 * SIZE_OF_LONG)); + + buffer.setLong(i2, tmp1); + buffer.setLong(i2 + 1 * SIZE_OF_LONG, tmp2); + buffer.setLong(i2 + 2 * SIZE_OF_LONG, tmp3); + } +} diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 996ea21913205..d6db53a003f36 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -111,9 +111,13 @@ message MessageMetadata { // Additional parameters required by encryption optional bytes encryption_param = 15; optional bytes schema_version = 16; + optional bool partition_key_b64_encoded = 17 [ default = false ]; // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode. optional bytes ordering_key = 18; + + // Mark the message to be delivered at or after the specified timestamp + optional int64 deliver_at_time = 19; // Identify whether a message is a "marker" message used for // internal metadata instead of application published data. diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java new file mode 100644 index 0000000000000..ea04a6d372f7a --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.fail; + +import org.testng.annotations.Test; + +public class TripleLongPriorityQueueTest { + + @Test + public void testQueue() { + TripleLongPriorityQueue pq = new TripleLongPriorityQueue(); + assertEquals(pq.size(), 0); + + final int N = 1000; + + for (int i = N; i > 0; i--) { + pq.add(i, i * 2, i * 3); + } + + assertEquals(pq.size(), N); + assertFalse(pq.isEmpty()); + + for (int i = 1; i <= N; i++) { + assertEquals(pq.peekN1(), i); + assertEquals(pq.peekN2(), i * 2); + assertEquals(pq.peekN3(), i * 3); + + pq.pop(); + + assertEquals(pq.size(), N - i); + } + + pq.close(); + } + + @Test + public void testCheckForEmpty() { + TripleLongPriorityQueue pq = new TripleLongPriorityQueue(); + assertEquals(pq.size(), 0); + assertEquals(pq.isEmpty(), true); + + try { + pq.peekN1(); + fail("Should fail"); + } catch (IllegalArgumentException e) { + // Ok + } + + try { + pq.peekN2(); + fail("Should fail"); + } catch (IllegalArgumentException e) { + // Ok + } + + try { + pq.peekN3(); + fail("Should fail"); + } catch (IllegalArgumentException e) { + // Ok + } + + try { + pq.pop(); + fail("Should fail"); + } catch (IllegalArgumentException e) { + // Ok + } + + pq.close(); + } + + @Test + public void testCompareWithSamePrefix() { + TripleLongPriorityQueue pq = new TripleLongPriorityQueue(); + assertEquals(pq.size(), 0); + assertEquals(pq.isEmpty(), true); + + pq.add(10, 20, 30); + pq.add(20, 10, 10); + pq.add(10, 20, 10); + pq.add(10, 30, 10); + pq.add(10, 20, 5); + + assertEquals(pq.size(), 5); + + assertEquals(pq.peekN1(), 10); + assertEquals(pq.peekN2(), 20); + assertEquals(pq.peekN3(), 5); + pq.pop(); + + assertEquals(pq.peekN1(), 10); + assertEquals(pq.peekN2(), 20); + assertEquals(pq.peekN3(), 10); + pq.pop(); + + assertEquals(pq.peekN1(), 10); + assertEquals(pq.peekN2(), 20); + assertEquals(pq.peekN3(), 30); + pq.pop(); + + assertEquals(pq.peekN1(), 10); + assertEquals(pq.peekN2(), 30); + assertEquals(pq.peekN3(), 10); + pq.pop(); + + assertEquals(pq.peekN1(), 20); + assertEquals(pq.peekN2(), 10); + assertEquals(pq.peekN3(), 10); + pq.pop(); + + assertEquals(pq.size(), 0); + assertEquals(pq.isEmpty(), true); + + pq.close(); + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 614527116a9a4..f81c582a1dd8d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -526,6 +526,18 @@ public TypedMessageBuilder loadConf(Map config) { return this; } + @Override + public TypedMessageBuilder deliverAfter(long delay, TimeUnit unit) { + underlyingBuilder.deliverAfter(delay, unit); + return this; + } + + @Override + public TypedMessageBuilder deliverAt(long timestamp) { + underlyingBuilder.deliverAt(timestamp); + return this; + } + public void setUnderlyingBuilder(TypedMessageBuilder underlyingBuilder) { this.underlyingBuilder = underlyingBuilder; } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 779d580d0e5cf..f41c8c706bd95 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -22,6 +22,16 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.RateLimiter; + +import io.netty.util.concurrent.DefaultThreadFactory; + import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.PrintStream; @@ -50,20 +60,11 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.ParameterException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.RateLimiter; - -import io.netty.util.concurrent.DefaultThreadFactory; - public class PerformanceProducer { private static final ExecutorService executor = Executors @@ -159,6 +160,9 @@ static class Arguments { "--encryption-key-value-file" }, description = "The file which contains the public key to encrypt payload") public String encKeyFile = null; + @Parameter(names = { "-d", + "--delay" }, description = "Mark messages with a given delay in seconds") + public long delay = 0; } public static void main(String[] args) throws Exception { @@ -355,7 +359,12 @@ public void run() { final long sendTime = System.nanoTime(); - producer.sendAsync(payloadData).thenRun(() -> { + TypedMessageBuilder messageBuilder = producer.newMessage() + .value(payloadData); + if (arguments.delay >0) { + messageBuilder.deliverAfter(arguments.delay, TimeUnit.SECONDS); + } + messageBuilder.sendAsync().thenRun(() -> { messagesSent.increment(); bytesSent.add(payloadData.length);