Skip to content

Commit

Permalink
[improve][broker] Make MessageRedeliveryController work more efficien…
Browse files Browse the repository at this point in the history
…tly (#17804)

(cherry picked from commit c60f895)
  • Loading branch information
codelipenghui committed Sep 23, 2022
1 parent aa68ef8 commit 19eb842
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 41 deletions.
2 changes: 2 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,8 @@ The Apache Software License, Version 2.0
- io.etcd-jetcd-core-0.5.11.jar
* IPAddress
- com.github.seancfoley-ipaddress-5.3.3.jar
* RoaringBitmap
- org.roaringbitmap-RoaringBitmap-0.9.15.jar

BSD 3-clause "New" or "Revised" License
* Google auth library
Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ flexible messaging model and an intuitive client API.</description>
<j2objc-annotations.version>1.3</j2objc-annotations.version>
<lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
<dependency-check-maven.version>7.1.0</dependency-check-maven.version>
<roaringbitmap.version>0.9.15</roaringbitmap.version>

<!-- Used to configure rename.netty.native. Libs -->
<rename.netty.native.libs>rename-netty-native-libs.sh</rename.netty.native.libs>
Expand Down Expand Up @@ -1276,6 +1277,13 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>reload4j</artifactId>
<version>${reload4j.version}</version>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>${roaringbitmap.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
5 changes: 5 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@
<artifactId>hppc</artifactId>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-api-examples</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,43 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
import org.apache.pulsar.common.util.collections.LongPairSet;
import org.apache.pulsar.utils.ConcurrentBitmapSortedLongPairSet;

public class MessageRedeliveryController {
private final LongPairSet messagesToRedeliver;
private final ConcurrentBitmapSortedLongPairSet messagesToRedeliver;
private final ConcurrentLongLongPairHashMap hashesToBeBlocked;

public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2, true);
this.messagesToRedeliver = new ConcurrentBitmapSortedLongPairSet();
this.hashesToBeBlocked = allowOutOfOrderDelivery
? null
: ConcurrentLongLongPairHashMap
.newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
}

public boolean add(long ledgerId, long entryId) {
return messagesToRedeliver.add(ledgerId, entryId);
public void add(long ledgerId, long entryId) {
messagesToRedeliver.add(ledgerId, entryId);
}

public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
public void add(long ledgerId, long entryId, long stickyKeyHash) {
if (hashesToBeBlocked != null) {
hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
}
return messagesToRedeliver.add(ledgerId, entryId);
messagesToRedeliver.add(ledgerId, entryId);
}

public boolean remove(long ledgerId, long entryId) {
public void remove(long ledgerId, long entryId) {
if (hashesToBeBlocked != null) {
hashesToBeBlocked.remove(ledgerId, entryId);
}
return messagesToRedeliver.remove(ledgerId, entryId);
messagesToRedeliver.remove(ledgerId, entryId);
}

public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
if (hashesToBeBlocked != null) {
List<LongPair> keysToRemove = new ArrayList<>();
hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) -> {
Expand All @@ -73,10 +70,7 @@ public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
keysToRemove.forEach(longPair -> hashesToBeBlocked.remove(longPair.first, longPair.second));
keysToRemove.clear();
}
return messagesToRedeliver.removeIf((ledgerId, entryId) -> {
return ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId)
.result() <= 0;
});
messagesToRedeliver.removeUpTo(markDeleteLedgerId, markDeleteEntryId + 1);
}

public boolean isEmpty() {
Expand Down Expand Up @@ -107,17 +101,6 @@ public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
}

public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (hashesToBeBlocked != null) {
// allowOutOfOrderDelivery is false
return messagesToRedeliver.items().stream()
.sorted((l1, l2) -> ComparisonChain.start().compare(l1.first, l2.first)
.compare(l1.second, l2.second).result())
.limit(maxMessagesToRead).map(longPair -> new PositionImpl(longPair.first, longPair.second))
.collect(Collectors.toCollection(TreeSet::new));
} else {
// allowOutOfOrderDelivery is true
return messagesToRedeliver.items(maxMessagesToRead,
(ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
}
return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.utils;

import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.common.util.collections.LongPairSet;
import org.roaringbitmap.RoaringBitmap;

public class ConcurrentBitmapSortedLongPairSet {

private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();

public void add(long item1, long item2) {
lock.writeLock().lock();
try {
RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
bitSet.add(item2, item2 + 1);
} finally {
lock.writeLock().unlock();
}
}

public void remove(long item1, long item2) {
lock.writeLock().lock();
try {
RoaringBitmap bitSet = map.get(item1);
if (bitSet != null) {
bitSet.remove(item2, item2 + 1);
if (bitSet.isEmpty()) {
map.remove(item1, bitSet);
}
}
} finally {
lock.writeLock().unlock();
}
}

public boolean contains(long item1, long item2) {
lock.readLock().lock();
try {
RoaringBitmap bitSet = map.get(item1);
return bitSet != null && bitSet.contains(item2, item2 + 1);
} finally {
lock.readLock().unlock();
}
}

public void removeUpTo(long item1, long item2) {
lock.writeLock().lock();
try {
Map.Entry<Long, RoaringBitmap> firstEntry = map.firstEntry();
while (firstEntry != null && firstEntry.getKey() <= item1) {
if (firstEntry.getKey() < item1) {
map.remove(firstEntry.getKey(), firstEntry.getValue());
} else {
RoaringBitmap bitSet = firstEntry.getValue();
if (bitSet != null) {
bitSet.remove(0, item2);
if (bitSet.isEmpty()) {
map.remove(firstEntry.getKey(), bitSet);
}
}
break;
}
firstEntry = map.firstEntry();
}
} finally {
lock.writeLock().unlock();
}
}


public <T> Set<T> items(int numberOfItems, LongPairSet.LongPairFunction<T> longPairConverter) {
NavigableSet<T> items = new TreeSet<>();
lock.readLock().lock();
try {
for (Map.Entry<Long, RoaringBitmap> entry : map.entrySet()) {
Iterator<Integer> iterator = entry.getValue().stream().iterator();
while (iterator.hasNext() && items.size() < numberOfItems) {
items.add(longPairConverter.apply(entry.getKey(), iterator.next()));
}
if (items.size() == numberOfItems) {
break;
}
}
} finally {
lock.readLock().unlock();
}
return items;
}

public boolean isEmpty() {
lock.readLock().lock();
try {
return map.isEmpty() || map.values().stream().allMatch(RoaringBitmap::isEmpty);
} finally {
lock.readLock().unlock();
}
}

public void clear() {
lock.writeLock().lock();
try {
map.clear();
} finally {
lock.writeLock().unlock();
}
}

public int size() {
lock.readLock().lock();
try {
return map.isEmpty() ? 0 : map.values().stream().mapToInt(RoaringBitmap::getCardinality).sum();
} finally {
lock.readLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.Set;
import java.util.TreeSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.utils.ConcurrentBitmapSortedLongPairSet;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.LongPairSet;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -48,7 +48,8 @@ public void testAddAndRemove(boolean allowOutOfOrderDelivery) throws Exception {

Field messagesToRedeliverField = MessageRedeliveryController.class.getDeclaredField("messagesToRedeliver");
messagesToRedeliverField.setAccessible(true);
LongPairSet messagesToRedeliver = (LongPairSet) messagesToRedeliverField.get(controller);
ConcurrentBitmapSortedLongPairSet messagesToRedeliver =
(ConcurrentBitmapSortedLongPairSet) messagesToRedeliverField.get(controller);

Field hashesToBeBlockedField = MessageRedeliveryController.class.getDeclaredField("hashesToBeBlocked");
hashesToBeBlockedField.setAccessible(true);
Expand All @@ -67,9 +68,8 @@ public void testAddAndRemove(boolean allowOutOfOrderDelivery) throws Exception {
assertEquals(hashesToBeBlocked.size(), 0);
}

assertTrue(controller.add(1, 1));
assertTrue(controller.add(1, 2));
assertFalse(controller.add(1, 1));
controller.add(1, 1);
controller.add(1, 2);

assertFalse(controller.isEmpty());
assertEquals(messagesToRedeliver.size(), 2);
Expand All @@ -81,9 +81,8 @@ public void testAddAndRemove(boolean allowOutOfOrderDelivery) throws Exception {
assertFalse(hashesToBeBlocked.containsKey(1, 2));
}

assertTrue(controller.remove(1, 1));
assertTrue(controller.remove(1, 2));
assertFalse(controller.remove(1, 1));
controller.remove(1, 1);
controller.remove(1, 2);

assertTrue(controller.isEmpty());
assertEquals(messagesToRedeliver.size(), 0);
Expand All @@ -93,10 +92,9 @@ public void testAddAndRemove(boolean allowOutOfOrderDelivery) throws Exception {
assertEquals(hashesToBeBlocked.size(), 0);
}

assertTrue(controller.add(2, 1, 100));
assertTrue(controller.add(2, 2, 101));
assertTrue(controller.add(2, 3, 101));
assertFalse(controller.add(2, 1, 100));
controller.add(2, 1, 100);
controller.add(2, 2, 101);
controller.add(2, 3, 101);

assertFalse(controller.isEmpty());
assertEquals(messagesToRedeliver.size(), 3);
Expand Down
Loading

0 comments on commit 19eb842

Please sign in to comment.