-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delayed message delivery implementation #4062
Conversation
@merlimat : how is this different from #3155? Also I think there was a long thread discussion about delayed message implementation. there were pushbacks on implementing delayed messages on brokers. hence a lot of efforts were postponed due to you and bunch of other people had concerns on the solutions to PIP-26 and #3155. but the approach here seems to take the broker-side approach again. I am wondering what is the thought behind this. How does this different from other proposals? Beside the implementation, since there was already a long discussion about delayed messages and I have spent the time on pushing the discussion and other people's efforts forward. Isn't it better to first get an agreement (or at least update the discussion thread) before starting a new PR? |
nvm. I saw the email in the email thread now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have looked into the pull request. This is actually a simpler implementation of PIP-26.
The DelayedDeliveryTracker
in this pull request is what is called delayed message index
in PIP-26.
In this pull request, the tracker is a priority queue, all in memory, and rebuild by replaying the messages after a broker crash.
In PIP-26, the tracker is a hash-wheel time-partitioned index. it can be all-in-memory and rebuilt by replaying the messages after a broker crash; or the time-partitioned index can be stored in ledgers to avoid replaying the messages to rebuild the index.
in theory, I don't see any technical differences between PIP-26 and #4062. In fact, I think #4062 is a simpler implementation of PIP-26 whose delayed message index is implemented using a priority queue. If so, how does this PR address the concerns raised when PIP-26 was started (i.e. making changes to dispatcher). FYI, PIP-26 was postponed because there were concerns about adding changes to dispatcher.
Is that a bad thing? Is there any limitation in this approach?
The changes to the dispatcher itself have been isolated in a very few specific points. It should be easy to review and verify that with feature turned off there's zero impact in current behavior. The biggest difference with this PR is that the tracking happens entirely off-heap, in direct memory. There are no objects created and retained for extended amount of time, which is the pattern that will kill the GC performances. A topic will have ByteBuf using direct memory where the priority queue is stored. On the data path there are no other allocations required. |
Nice to see that change, is there a config option to turn it ON per namespace?
How does this work with load balancing? Does the load balancer know which topics are going to need this allocation. Is there a limit on delay, num of delayed messages pending etc? What's the limits? How is inversion handled? What happens if a message to be delayed by for eg: a few days is at the head? Will that halt the advance of the delete cursor? What if a few of those kind are randomly spread around? Is this taking for granted that on a broker restart, everything spanning the period of largest delay will potentially be read through again? Is there a checkpoint for a polite shutdown/unload? I prefer configurable limits, and deterministic performance so that system behavior can be predicted during rolling upgrades and failures. Pulsar handles rolling upgrades and failures way better than other systems , and it would be preferable to maintain that. |
To me this is one of the best things about this pull, and absolutely not a bad thing. I still didn't take a deep look but my only concern would be, how will behave when very different range of delay arrive? Users sometimes makes an abusive use from this type of feature. The improvement I really like from this is that both features (this and #4062) uses a priority queue but this pull uses the buffer in direct memory 👍
Since each adjacent message could have an arbitrary delay I can't see how collapsing by id range could be made. |
It is not a bad thing. I am actually super happy to see this happen because I am a supporter for broker-side approaches from the beginning (if you have followed the email discussion).
If you took a look at my comment, PIP-26 also isolates the changes to a structure called
I don't think the biggest difference with this PR and PIP-26 is the direct memory thing you mentioned on implementing DelayDeliveryTracker. The delayed message index in PIP-26 can also be implemented using direct memory without allocation. IMO the difference between this PR and PIP-26 is -
Lastly, PIP-26 already presents changes regarding api, protocol, namespace policies and many other changes around this area. Shall we just pickup the proposed changes there instead of starting a new effort? |
That's a very good point. It would be good to have a DelayedDeliveryTracker as an interface and we can have different implementations. That will help:
I'll update this PR to make the interface configurable.
In PIP-26 the proposed API methods were: // message to be delivered at the configured delay interval
producer.newMessage().delayAt(3L, TimeUnit.MINUTE).value("Hello Pulsar!").send();
// message to be delivered at the configure time.
producer.newMessage().scheduleAt(new Date(2018, 10, 31, 23, 00, 00)) In this PR I'm proposing: producer.newMessage().deliverAfter(3, TimeUnit.MINUTE).value("hello").send();
producer.newMessage().deliverAt(timestamp).value("hello").send(); My reasons are:
For the protobuf metadata change, in PIP-26 was : // the message will be delayed at delivery by `delayed_ms` milliseconds.
optional int64 delayed_ms = 18; Though that won't support specifying absolute time of scheduling scheduling. Instead, I propose to start with // Mark the message to be delivered at or after the specified timestamp
optional uint64 deliver_at_time = 18; Initially, with relative delays the client will just apply based on its current time. Once we have broker assigned timestamp (stored within the message metadata), then we could add a second field. |
@merlimat : great! These comments around API and protocol changes are great if it can be done when PIP-26 was sent out. A |
Also can you provide a namespace policy to enable and disable this feature per namespace as what PIP-26 proposed? It doesn't have to be in this PR, but an issue filed for tracking this is good. |
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Show resolved
Hide resolved
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Show resolved
Hide resolved
run java8 tests |
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
run java8 tests |
1 similar comment
run java8 tests |
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java
Outdated
Show resolved
Hide resolved
@Override | ||
public Set<PositionImpl> getScheduledMessages(int maxMessages) { | ||
int n = maxMessages; | ||
Set<PositionImpl> positions = new TreeSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we already know life cycle of PositionImpl
then can we use PositionImplRecylce
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to keep it simple for now. We can iteratively improve and optimize.
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Get scheduled messags - found {}", dispatcher.getName(), positions.size()); | ||
} | ||
updateTimer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we updating timer here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We took items out of the queue, we need to adjust the timer for the next scheduled message
...-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
Show resolved
Hide resolved
@rdhabalia @ivankelly Please take another look. |
run java8 tests |
@rdhabalia @ivankelly Please take another look, so that we can wrap up the features for 2.4.0. |
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
@Slf4j | ||
public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask { | ||
|
||
private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This queue is unbounded. It could potentially allow someone to DOS the broker, by just allowing them to send a bunch of messages with a delivery date far in the future. We should degrade gracefully from this, though I'm not sure what the nicest behaviour would be for the user. Maybe if the queue is full, force delivery from the head of the queue or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the idea was to start with simple implementation and iterate from that, based on the observed issues/weaknesses.
Also, there are 2 ways to address that:
- The feature can be disabled on server side
- The tracker implementation is pluggable, so one could either expand the current one or provide an alternative implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll +1 this one, but this DOS should be dealt with asap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cap on the mem size will need to be applied per-broker though rather than per-topic
run java8 tests |
1 similar comment
run java8 tests |
Motivation
Fixes #2375
Allow the option to mark messages for delayed delivery.
Notes:
Implementation
Possible improvements
The goal of this PR is to have simple working solution that can be used to efficiently apply delay on 10s of millions of messages at any given time.
There are several improvements that could be considered, based on real-world usage feedback.
For example: