Skip to content

Commit

Permalink
[feat][broker][PIP-195] New bucket based delayed message tracker - in…
Browse files Browse the repository at this point in the history
…terface&config&proto -part 1 (#17344)
  • Loading branch information
coderzc authored Sep 14, 2022
1 parent e238dcc commit 6bc3016
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 2 deletions.
21 changes: 21 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
# be no tracking overhead.
delayedDeliveryEnabled=true

# Class name of the factory that implements the delayed deliver tracker.
# If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
# will create bucket based delayed message index tracker.
delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory

# Control the tick time for when retrying on delayed delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
# Note that this time is used to configure the HashedWheelTimer's tick time for the
Expand All @@ -549,6 +554,22 @@ delayedDeliveryTickTimeMillis=1000
# delayedDeliveryTickTimeMillis.
isDelayedDeliveryDeliverAtTimeStrict=false

# The delayed message index bucket min index count.
# When the index count of the current bucket is more than this value and all message indexes of current ledger
# have already been added to the tracker we will seal the bucket.
delayedDeliveryMinIndexCountPerBucket=50000

# The delayed message index bucket time step(in seconds) in per bucket snapshot segment,
# after reaching the max time step limitation, the snapshot segment will be cut off.
delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds=300

# The max number of delayed message index bucket,
# after reaching the max buckets limitation, the adjacent buckets will be merged.
delayedDeliveryMaxNumBuckets=50

# Enable share the delayed message index across subscriptions
delayedDeliverySharedIndexEnabled=false

# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.")
private boolean delayedDeliveryEnabled = true;

@FieldContext(category = CATEGORY_SERVER, doc = "Class name of the factory that implements the delayed deliver "
+ "tracker")
@FieldContext(category = CATEGORY_SERVER, doc = """
Class name of the factory that implements the delayed deliver tracker.
If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory", \
will create bucket based delayed message index tracker.
""")
private String delayedDeliveryTrackerFactoryClassName = "org.apache.pulsar.broker.delayed"
+ ".InMemoryDelayedDeliveryTrackerFactory";

Expand All @@ -350,6 +353,25 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "delayedDeliveryTickTimeMillis.")
private boolean isDelayedDeliveryDeliverAtTimeStrict = false;

@FieldContext(category = CATEGORY_SERVER, doc = """
The delayed message index bucket min index count. When the index count of the current bucket is more than \
this value and all message indexes of current ledger have already been added to the tracker \
we will seal the bucket.""")
private long delayedDeliveryMinIndexCountPerBucket = 50000;

@FieldContext(category = CATEGORY_SERVER, doc = """
The delayed message index bucket time step(in seconds) in per bucket snapshot segment, \
after reaching the max time step limitation, the snapshot segment will be cut off.""")
private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 300;

@FieldContext(category = CATEGORY_SERVER, doc = """
The max number of delayed message index bucket, \
after reaching the max buckets limitation, the adjacent buckets will be merged.""")
private int delayedDeliveryMaxNumBuckets = 50;

@FieldContext(category = CATEGORY_SERVER, doc = "Enable share the delayed message index across subscriptions")
private boolean delayedDeliverySharedIndexEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
private boolean acknowledgmentAtBatchIndexLevelEnabled = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;

public interface BucketSnapshotStorage {

/**
* Create a delayed message index bucket snapshot with metadata and bucketSnapshotSegments.
*
* @param snapshotMetadata the metadata of snapshot
* @param bucketSnapshotSegments the list of snapshot segments
* @return the future with bucketId(ledgerId).
*/
CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
List<SnapshotSegment> bucketSnapshotSegments);

/**
* Get delayed message index bucket snapshot metadata.
*
* @param bucketId the bucketId of snapshot
* @return the future with snapshot expanded metadata
*/
CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId);

/**
* Get a sequence of delayed message index bucket snapshot segments.
*
* @param bucketId the bucketId of snapshot
* @param firstSegmentEntryId entryId of first segment of sequence
* @param lastSegmentEntryId entryId of last segment of sequence
* @return the future with snapshot segment
*/
CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
long lastSegmentEntryId);

/**
* Get total byte length of delayed message index bucket snapshot.
*
* @param bucketId the bucketId of snapshot
* @return the future with byte length of snapshot
*/
CompletableFuture<Long> getBucketSnapshotLength(long bucketId);

/**
* Delete delayed message index bucket snapshot by bucketId.
*
* @param bucketId the bucketId of snapshot
*/
CompletableFuture<Void> deleteBucketSnapshot(long bucketId);

/**
* Start the bucket snapshot storage service.
*/
void start() throws Exception;

/**
* Close the bucket snapshot storage service.
*/
void close() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
*/
long getNumberOfDelayedMessages();

/**
* The amount of memory used to back the delayed message index.
*/
long getBufferMemoryUsage();

/**
* Get a set of position of messages that have already reached the delivery time.
*/
Expand All @@ -61,6 +66,12 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
*/
boolean shouldPauseAllDeliveries();

/**
* Tells whether this DelayedDeliveryTracker contains this message index,
* if the tracker is not supported it or disabled this feature also will return false.
*/
boolean containsMessage(long ledgerId, long entryId);

/**
* Reset tick time use zk policies cache.
* @param tickTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public long getNumberOfDelayedMessages() {
return priorityQueue.size();
}

@Override
public long getBufferMemoryUsage() {
return priorityQueue.bytesCapacity();
}
Expand Down Expand Up @@ -286,4 +287,9 @@ public boolean shouldPauseAllDeliveries() {
&& priorityQueue.size() >= DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
&& !hasMessageAvailable();
}

@Override
public boolean containsMessage(long ledgerId, long entryId) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto2";

package pulsar.delay;
option java_package = "org.apache.pulsar.broker.delayed.proto";
option optimize_for = SPEED;

message DelayedIndex {
required uint64 timestamp = 1;
required int64 ledger_id = 2;
required int64 entry_id = 3;
}

message SnapshotSegmentMetadata {
map<uint64, bytes> delayed_index_bit_map = 1;
required uint64 max_schedule_timestamp = 2;
}

message SnapshotSegment {
repeated DelayedIndex indexes = 1;
}

message SnapshotMetadata {
repeated SnapshotSegmentMetadata metadata_list = 1;
}

0 comments on commit 6bc3016

Please sign in to comment.