From e11497fcac861e912dfffbbe9d29fa95ed2d8862 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Tue, 26 Nov 2024 16:40:07 +0800 Subject: [PATCH 1/3] use Long2ObjectMap to implement. --- .../service/InMemoryRedeliveryTracker.java | 86 +++++++++++++++---- 1 file changed, 68 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java index 12e28793557b3..211ef6264d257 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java @@ -19,48 +19,98 @@ package org.apache.pulsar.broker.service; import java.util.List; +import java.util.concurrent.locks.StampedLock; + +import it.unimi.dsi.fastutil.longs.Long2IntMap; +import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2LongMap; +import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair; -public class InMemoryRedeliveryTracker implements RedeliveryTracker { - - private ConcurrentLongLongPairHashMap trackerCache = ConcurrentLongLongPairHashMap.newBuilder() - .concurrencyLevel(1) - .expectedItems(256) - .autoShrink(true) - .build(); +public class InMemoryRedeliveryTracker extends StampedLock implements RedeliveryTracker { + // ledgerId -> entryId -> count + private Long2ObjectMap trackerCache = new Long2ObjectOpenHashMap<>(); @Override public int incrementAndGetRedeliveryCount(Position position) { - Position positionImpl = position; - LongPair count = trackerCache.get(positionImpl.getLedgerId(), positionImpl.getEntryId()); - int newCount = (int) (count != null ? count.first + 1 : 1); - trackerCache.put(positionImpl.getLedgerId(), positionImpl.getEntryId(), newCount, 0L); + long stamp = writeLock(); + int newCount; + try { + Long2IntMap entryMap = trackerCache.computeIfAbsent(position.getLedgerId(), + k -> new Long2IntOpenHashMap()); + newCount = entryMap.getOrDefault(position.getEntryId(), 0) + 1; + entryMap.put(position.getEntryId(), newCount); + } finally { + unlockWrite(stamp); + } return newCount; } @Override public int getRedeliveryCount(long ledgerId, long entryId) { - LongPair count = trackerCache.get(ledgerId, entryId); - return (int) (count != null ? count.first : 0); + long stamp = tryOptimisticRead(); + Long2IntMap entryMap = trackerCache.get(ledgerId); + int count = entryMap != null ? entryMap.get(entryId) : 0; + if (!validate(stamp)) { + stamp = readLock(); + try { + entryMap = trackerCache.get(ledgerId); + count = entryMap != null ? entryMap.get(entryId) : 0; + } finally { + unlockRead(stamp); + } + } + return count; } @Override public void remove(Position position) { - Position positionImpl = position; - trackerCache.remove(positionImpl.getLedgerId(), positionImpl.getEntryId()); + long stamp = writeLock(); + try { + Long2IntMap entryMap = trackerCache.get(position.getLedgerId()); + if (entryMap != null) { + entryMap.remove(position.getEntryId()); + if (entryMap.isEmpty()) { + trackerCache.remove(position.getLedgerId()); + } + } + } finally { + unlockWrite(stamp); + } } @Override public void removeBatch(List positions) { - if (positions != null) { - positions.forEach(this::remove); + if (positions == null) { + return; + } + long stamp = writeLock(); + try { + for (Position position : positions) { + Long2IntMap entryMap = trackerCache.get(position.getLedgerId()); + if (entryMap != null) { + entryMap.remove(position.getEntryId()); + if (entryMap.isEmpty()) { + trackerCache.remove(position.getLedgerId()); + } + } + } + } finally { + unlockWrite(stamp); } } @Override public void clear() { - trackerCache.clear(); + long stamp = writeLock(); + try { + trackerCache.clear(); + } finally { + unlockWrite(stamp); + } } } From 6c3dad8d2d787da18e3dda24e60f1a9cc3297a40 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Tue, 26 Nov 2024 17:01:56 +0800 Subject: [PATCH 2/3] fix checkstyle. --- .../pulsar/broker/service/InMemoryRedeliveryTracker.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java index 211ef6264d257..60564efdccd78 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java @@ -18,18 +18,13 @@ */ package org.apache.pulsar.broker.service; -import java.util.List; -import java.util.concurrent.locks.StampedLock; - import it.unimi.dsi.fastutil.longs.Long2IntMap; import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; -import it.unimi.dsi.fastutil.longs.Long2LongMap; -import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import java.util.List; +import java.util.concurrent.locks.StampedLock; import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; -import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair; public class InMemoryRedeliveryTracker extends StampedLock implements RedeliveryTracker { // ledgerId -> entryId -> count From c37e6050d9a9f7a7becbe845bf1024170312218d Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 27 Nov 2024 10:22:55 +0800 Subject: [PATCH 3/3] use composition instead of inheritance. --- .../service/InMemoryRedeliveryTracker.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java index 60564efdccd78..669562055214c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java @@ -26,13 +26,14 @@ import java.util.concurrent.locks.StampedLock; import org.apache.bookkeeper.mledger.Position; -public class InMemoryRedeliveryTracker extends StampedLock implements RedeliveryTracker { +public class InMemoryRedeliveryTracker implements RedeliveryTracker { // ledgerId -> entryId -> count private Long2ObjectMap trackerCache = new Long2ObjectOpenHashMap<>(); + private final StampedLock rwLock = new StampedLock(); @Override public int incrementAndGetRedeliveryCount(Position position) { - long stamp = writeLock(); + long stamp = rwLock.writeLock(); int newCount; try { Long2IntMap entryMap = trackerCache.computeIfAbsent(position.getLedgerId(), @@ -40,23 +41,23 @@ public int incrementAndGetRedeliveryCount(Position position) { newCount = entryMap.getOrDefault(position.getEntryId(), 0) + 1; entryMap.put(position.getEntryId(), newCount); } finally { - unlockWrite(stamp); + rwLock.unlockWrite(stamp); } return newCount; } @Override public int getRedeliveryCount(long ledgerId, long entryId) { - long stamp = tryOptimisticRead(); + long stamp = rwLock.tryOptimisticRead(); Long2IntMap entryMap = trackerCache.get(ledgerId); int count = entryMap != null ? entryMap.get(entryId) : 0; - if (!validate(stamp)) { - stamp = readLock(); + if (!rwLock.validate(stamp)) { + stamp = rwLock.readLock(); try { entryMap = trackerCache.get(ledgerId); count = entryMap != null ? entryMap.get(entryId) : 0; } finally { - unlockRead(stamp); + rwLock.unlockRead(stamp); } } return count; @@ -64,7 +65,7 @@ public int getRedeliveryCount(long ledgerId, long entryId) { @Override public void remove(Position position) { - long stamp = writeLock(); + long stamp = rwLock.writeLock(); try { Long2IntMap entryMap = trackerCache.get(position.getLedgerId()); if (entryMap != null) { @@ -74,7 +75,7 @@ public void remove(Position position) { } } } finally { - unlockWrite(stamp); + rwLock.unlockWrite(stamp); } } @@ -83,7 +84,7 @@ public void removeBatch(List positions) { if (positions == null) { return; } - long stamp = writeLock(); + long stamp = rwLock.writeLock(); try { for (Position position : positions) { Long2IntMap entryMap = trackerCache.get(position.getLedgerId()); @@ -95,17 +96,17 @@ public void removeBatch(List positions) { } } } finally { - unlockWrite(stamp); + rwLock.unlockWrite(stamp); } } @Override public void clear() { - long stamp = writeLock(); + long stamp = rwLock.writeLock(); try { trackerCache.clear(); } finally { - unlockWrite(stamp); + rwLock.unlockWrite(stamp); } } }