Skip to content

Commit

Permalink
Delayed message delivery implementation (#4062)
Browse files Browse the repository at this point in the history
* Delayed message delivery implementation

* Fixed compilation

* Allow to configure the delayed tracker implementation

* Use int64 for timestamp

* Address comments

* More tests for TripleLongPriorityQueue

* Removing useless sync block that causes deadlock with consumer close

* Fixed merge conflict

* Avoid new list when passing entries to consumer

* Fixed test. Since entries are evicted from cache, they might be resent in diff order

* Fixed context message builder

* Fixed triggering writePromise when last entry was nullified

* Moved entries filtering from consumer to dispatcher

* Added Javadocs

* Reduced synchronized scope to minimum
  • Loading branch information
merlimat authored May 29, 2019
1 parent 59705a8 commit ba24d73
Show file tree
Hide file tree
Showing 32 changed files with 1,471 additions and 48 deletions.
12 changes: 11 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ maxMessageSize=5242880
# Interval between checks to see if topics with compaction policies need to be compacted
brokerServiceCompactionMonitorIntervalInSeconds=60

# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
delayedDeliveryEnabled=true

# Control the tick time for when retrying on delayed delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000

# Enable tracking of replicated subscriptions state across clusters.
enableReplicatedSubscriptions=true

Expand Down Expand Up @@ -426,7 +436,7 @@ bookkeeperClientReorderReadSequenceEnabled=false
# outside the specified groups will not be used by the broker
bookkeeperClientIsolationGroups=

# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't
# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't
# have enough bookie available.
bookkeeperClientSecondaryIsolationGroups=

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
// waiting for another HTTP call to complete in same thread.
private int numHttpServerThreads = Math.max(8, 2 * Runtime.getRuntime().availableProcessors());

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.")
private boolean delayedDeliveryEnabled = true;

@FieldContext(category = CATEGORY_SERVER, doc = "Class name of the factory that implements the delayed deliver tracker")
private String delayedDeliveryTrackerFactoryClassName = "org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory";

@FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for when retrying on delayed delivery, "
+ " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.")
private long delayedDeliveryTickTimeMillis = 1000;

@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "Enable the WebSocket API service in broker"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@

import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -95,9 +98,9 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
Expand Down Expand Up @@ -543,7 +546,7 @@ private void startZkCacheService() throws PulsarServerException {
this.localZkCache = new LocalZooKeeperCache(getZkClient(), config.getZooKeeperOperationTimeoutSeconds(),
getOrderedExecutor());
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(),
(int) config.getZooKeeperSessionTimeoutMillis(),
config.getZooKeeperOperationTimeoutSeconds(), config.getConfigurationStoreServers(),
getOrderedExecutor(), this.cacheExecutor);
try {
Expand Down Expand Up @@ -883,7 +886,7 @@ public static String brokerUrlTls(String host, int port) {
}

public static String webAddress(ServiceConfiguration config) {
if (config.getWebServicePort().isPresent()) {
if (config.getWebServicePort().isPresent()) {
return webAddress(advertisedAddress(config), config.getWebServicePort().get());
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.Beta;

import java.util.Set;

import org.apache.bookkeeper.mledger.impl.PositionImpl;

/**
* Represent the tracker for the delayed delivery of messages for a particular subscription.
*
* Note: this interface is still being refined and some breaking changes might be introduced.
*/
@Beta
public interface DelayedDeliveryTracker extends AutoCloseable {

/**
* Add a message to the tracker
*
* @param ledgerId
* the ledgerId
* @param entryId
* the entryId
* @param deliveryAt
* the absolute timestamp at which the message should be tracked
* @return true if the message was added to the tracker or false if it should be delivered immediately
*/
boolean addMessage(long ledgerId, long entryId, long deliveryAt);

/**
* Return true if there's at least a message that is scheduled to be delivered already
*/
boolean hasMessageAvailable();

/**
* @return the number of delayed messages being tracked
*/
long getNumberOfDelayedMessages();

/**
* Get a set of position of messages that have already reached the delivery time
*/
Set<PositionImpl> getScheduledMessages(int maxMessages);

/**
* Close the subscription tracker and release all resources.
*/
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.Beta;

import java.io.IOException;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;

/**
* Factory of InMemoryDelayedDeliveryTracker objects. This is the entry point for implementations.
*
* Note: this interface is still being refined and some breaking changes might be introduced.
*/
@Beta
public interface DelayedDeliveryTrackerFactory extends AutoCloseable {
/**
* Initialize the factory implementation from the broker service configuration
*
* @param config
* the broker service config object
*/
void initialize(ServiceConfiguration config) throws IOException;

/**
* Create a new tracker instance.
*
* @param dispatcher
* a multi-consumer dispatcher instance
*/
DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher);

/**
* Close the factory and release all the resources
*/
void close() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;

import static com.google.common.base.Preconditions.checkArgument;

import java.io.IOException;

import lombok.experimental.UtilityClass;

import org.apache.pulsar.broker.ServiceConfiguration;

@UtilityClass
public class DelayedDeliveryTrackerLoader {
public static DelayedDeliveryTrackerFactory loadDelayedDeliveryTrackerFactory(ServiceConfiguration conf)
throws IOException {
Class<?> factoryClass;
try {
factoryClass = Class.forName(conf.getDelayedDeliveryTrackerFactoryClassName());
Object obj = factoryClass.newInstance();
checkArgument(obj instanceof DelayedDeliveryTrackerFactory,
"The factory has to be an instance of " + DelayedDeliveryTrackerFactory.class.getName());

DelayedDeliveryTrackerFactory factory = (DelayedDeliveryTrackerFactory) obj;
factory.initialize(conf);
return factory;
} catch (Exception e) {
throw new IOException(e);
}
}
}
Loading

0 comments on commit ba24d73

Please sign in to comment.