diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 6001ef3c03b14..f6de52ca5c61a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -688,7 +688,7 @@ private boolean hasMessagesToReplay() { return false; } - private Set getMessagesToReplayNow(int maxMessagesToRead) { + private synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { if (!messagesToRedeliver.isEmpty()) { return messagesToRedeliver.items(maxMessagesToRead, (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); @@ -697,7 +697,7 @@ private Set getMessagesToReplayNow(int maxMessagesToRead) { } } - public long getNumberOfDelayedMessages() { + public synchronized long getNumberOfDelayedMessages() { if (delayedDeliveryTracker.isPresent()) { return delayedDeliveryTracker.get().getNumberOfDelayedMessages(); } else { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 2f02e37c30ed4..9199b4ee7cfde 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -3086,7 +3086,7 @@ public interface MessageMetadataOrBuilder boolean hasPartitionKeyB64Encoded(); boolean getPartitionKeyB64Encoded(); - // optional uint64 deliver_at_time = 18; + // optional int64 deliver_at_time = 18; boolean hasDeliverAtTime(); long getDeliverAtTime(); } @@ -3399,7 +3399,7 @@ public boolean getPartitionKeyB64Encoded() { return partitionKeyB64Encoded_; } - // optional uint64 deliver_at_time = 18; + // optional int64 deliver_at_time = 18; public static final int DELIVER_AT_TIME_FIELD_NUMBER = 18; private long deliverAtTime_; public boolean hasDeliverAtTime() { @@ -3518,7 +3518,7 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr output.writeBool(17, partitionKeyB64Encoded_); } if (((bitField0_ & 0x00002000) == 0x00002000)) { - output.writeUInt64(18, deliverAtTime_); + output.writeInt64(18, deliverAtTime_); } } @@ -3599,7 +3599,7 @@ public int getSerializedSize() { } if (((bitField0_ & 0x00002000) == 0x00002000)) { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream - .computeUInt64Size(18, deliverAtTime_); + .computeInt64Size(18, deliverAtTime_); } memoizedSerializedSize = size; return size; @@ -4072,7 +4072,7 @@ public Builder mergeFrom( } case 144: { bitField0_ |= 0x00010000; - deliverAtTime_ = input.readUInt64(); + deliverAtTime_ = input.readInt64(); break; } } @@ -4657,7 +4657,7 @@ public Builder clearPartitionKeyB64Encoded() { return this; } - // optional uint64 deliver_at_time = 18; + // optional int64 deliver_at_time = 18; private long deliverAtTime_ ; public boolean hasDeliverAtTime() { return ((bitField0_ & 0x00010000) == 0x00010000); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index e222592dffb89..b870a0fe07bac 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -114,7 +114,7 @@ message MessageMetadata { optional bool partition_key_b64_encoded = 17 [ default = false ]; // Mark the message to be delivered at or after the specified timestamp - optional uint64 deliver_at_time = 18; + optional int64 deliver_at_time = 18; }