Skip to content

Commit

Permalink
Allow to configure the delayed tracker implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Apr 18, 2019
1 parent b54c438 commit d5e1ebc
Show file tree
Hide file tree
Showing 13 changed files with 273 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.")
private boolean delayedDeliveryEnabled = true;

@FieldContext(category = CATEGORY_SERVER, doc = "Class name of the factory that implements the delayed deliver tracker")
private String delayedDeliveryTrackerFactoryClassName = "org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory";

@FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for when retrying on delayed delivery, "
+ " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.")
private long delayedDeliveryTickTimeMillis = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -97,9 +98,9 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
Expand Down Expand Up @@ -168,8 +169,6 @@ public class PulsarService implements AutoCloseable {
private SchemaRegistryService schemaRegistryService = null;
private final Optional<WorkerService> functionWorkerService;

private Optional<Timer> delayedDeliveryTimer = Optional.empty();

private final MessagingServiceShutdownHook shutdownService;

private MetricsGenerator metricsGenerator;
Expand Down Expand Up @@ -218,10 +217,6 @@ public void close() throws PulsarServerException {
return;
}

if (delayedDeliveryTimer.isPresent()) {
delayedDeliveryTimer.get().stop();
}

// close the service in reverse order v.s. in which they are started
if (this.webService != null) {
this.webService.close();
Expand Down Expand Up @@ -950,16 +945,6 @@ public SchemaRegistryService getSchemaRegistryService() {
return schemaRegistryService;
}

public synchronized Timer getDelayedDeliveryTimer() {
// Lazy initialize
if (!delayedDeliveryTimer.isPresent()) {
delayedDeliveryTimer = Optional.of(new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS));
}

return delayedDeliveryTimer.get();
}

private void startWorkerService(AuthenticationService authenticationService,
AuthorizationService authorizationService)
throws InterruptedException, IOException, KeeperException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.Beta;

import java.util.Set;

import org.apache.bookkeeper.mledger.impl.PositionImpl;

/**
* Represent the tracker for the delayed delivery of messages for a particular subscription.
*
* Note: this interface is still being refined and some breaking changes might be introduced.
*/
@Beta
public interface DelayedDeliveryTracker extends AutoCloseable {

/**
* Add a message to the tracker
*
* @param ledgerId
* the ledgerId
* @param entryId
* the entryId
* @param deliveryAt
* the absolute timestamp at which the message should be tracked
* @return true if the message was added to the tracker or false if it should be delivered immediately
*/
boolean addMessage(long ledgerId, long entryId, long deliveryAt);

/**
* Return true if there's at least a message that is scheduled to be delivered already
*/
boolean hasMessageAvailable();

/**
* @return the number of delayed messages being tracked
*/
long getNumberOfDelayedMessages();

/**
* Get a set of position of messages that have already reached the delivery time
*/
Set<PositionImpl> getScheduledMessages(int maxMessages);

/**
* Close the subscription tracker and release all resources.
*/
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.Beta;

import java.io.IOException;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;

/**
* Factory of InMemoryDelayedDeliveryTracker objects. This is the entry point for implementations.
*
* Note: this interface is still being refined and some breaking changes might be introduced.
*/
@Beta
public interface DelayedDeliveryTrackerFactory extends AutoCloseable {
/**
* Initialize the factory implementation from the broker service configuration
*
* @param config
* the broker service config object
*/
void initialize(ServiceConfiguration config) throws IOException;

/**
* Create a new tracker instance.
*
* @param dispatcher
* a multi-consumer dispatcher instance
*/
DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher);

/**
* Close the factory and release all the resources
*/
void close() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;

import static com.google.common.base.Preconditions.checkArgument;

import java.io.IOException;

import lombok.experimental.UtilityClass;

import org.apache.pulsar.broker.ServiceConfiguration;

@UtilityClass
public class DelayedDeliveryTrackerLoader {
public static DelayedDeliveryTrackerFactory loadDelayedDeliveryTrackerFactory(ServiceConfiguration conf)
throws IOException {
// try {
Class<?> factoryClass;
try {
factoryClass = Class.forName(conf.getDelayedDeliveryTrackerFactoryClassName());
Object obj = factoryClass.newInstance();
checkArgument(obj instanceof DelayedDeliveryTrackerFactory,
"The factory has to be an instance of " + DelayedDeliveryTrackerFactory.class.getName());

DelayedDeliveryTrackerFactory factory = (DelayedDeliveryTrackerFactory) obj;
factory.initialize(conf);
return factory;

} catch (Exception e) {
throw new IOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;
package org.apache.pulsar.broker.delayed;

import io.netty.util.Timeout;
import io.netty.util.Timer;
Expand All @@ -29,10 +29,11 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;

@Slf4j
public class DelayedDeliveryTracker implements AutoCloseable, TimerTask {
public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask {

private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();

Expand All @@ -49,13 +50,13 @@ public class DelayedDeliveryTracker implements AutoCloseable, TimerTask {

private final long tickTimeMillis;

public DelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher) {
InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis) {
this.dispatcher = dispatcher;
this.timer = dispatcher.getTopic().getBrokerService().pulsar().getDelayedDeliveryTimer();
this.tickTimeMillis = dispatcher.getTopic().getBrokerService().pulsar().getConfiguration()
.getDelayedDeliveryTickTimeMillis();
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
}

@Override
public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
long now = System.currentTimeMillis();
if (log.isDebugEnabled()) {
Expand All @@ -75,13 +76,15 @@ public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
/**
* Return true if there's at least a message that is scheduled to be delivered already
*/
@Override
public boolean hasMessageAvailable() {
return !priorityQueue.isEmpty() && priorityQueue.peekN1() <= System.currentTimeMillis();
}

/**
* Get a set of position of messages that have already reached
*/
@Override
public Set<PositionImpl> getScheduledMessages(int maxMessages) {
int n = maxMessages;
Set<PositionImpl> positions = new TreeSet<>();
Expand Down Expand Up @@ -111,7 +114,8 @@ public Set<PositionImpl> getScheduledMessages(int maxMessages) {
return positions;
}

public long size() {
@Override
public long getNumberOfDelayedMessages() {
return priorityQueue.size();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;

public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory {

private Timer timer;

private long tickTimeMillis;

@Override
public void initialize(ServiceConfiguration config) {
this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS);
this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
}

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis);
}

@Override
public void close() {
if (timer != null) {
timer.stop();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ public interface LoadManager {
* @throws Exception
*/
public void disableBroker() throws Exception;

/**
* Get list of available brokers in cluster
*
*
* @return
* @throws Exception
* @throws Exception
*/
Set<String> getAvailableBrokers() throws Exception;

Expand All @@ -134,9 +134,7 @@ static LoadManager create(final PulsarService pulsar) {
// Assume there is a constructor with one argument of PulsarService.
final Object loadManagerInstance = loadManagerClass.newInstance();
if (loadManagerInstance instanceof LoadManager) {
final LoadManager casted = (LoadManager) loadManagerInstance;
casted.initialize(pulsar);
return casted;
return (LoadManager) loadManagerInstance;
} else if (loadManagerInstance instanceof ModularLoadManager) {
final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
casted.initialize(pulsar);
Expand Down
Loading

0 comments on commit d5e1ebc

Please sign in to comment.