Skip to content

Commit

Permalink
[improve][broker] Reduce memory occupation of InMemoryRedeliveryTrack…
Browse files Browse the repository at this point in the history
…er. (#23640)
  • Loading branch information
thetumbled authored Nov 27, 2024
1 parent dbfde02 commit 6b694f6
Showing 1 changed file with 65 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,95 @@
*/
package org.apache.pulsar.broker.service;

import it.unimi.dsi.fastutil.longs.Long2IntMap;
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.util.List;
import java.util.concurrent.locks.StampedLock;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;

public class InMemoryRedeliveryTracker implements RedeliveryTracker {

private ConcurrentLongLongPairHashMap trackerCache = ConcurrentLongLongPairHashMap.newBuilder()
.concurrencyLevel(1)
.expectedItems(256)
.autoShrink(true)
.build();
// ledgerId -> entryId -> count
private Long2ObjectMap<Long2IntMap> trackerCache = new Long2ObjectOpenHashMap<>();
private final StampedLock rwLock = new StampedLock();

@Override
public int incrementAndGetRedeliveryCount(Position position) {
Position positionImpl = position;
LongPair count = trackerCache.get(positionImpl.getLedgerId(), positionImpl.getEntryId());
int newCount = (int) (count != null ? count.first + 1 : 1);
trackerCache.put(positionImpl.getLedgerId(), positionImpl.getEntryId(), newCount, 0L);
long stamp = rwLock.writeLock();
int newCount;
try {
Long2IntMap entryMap = trackerCache.computeIfAbsent(position.getLedgerId(),
k -> new Long2IntOpenHashMap());
newCount = entryMap.getOrDefault(position.getEntryId(), 0) + 1;
entryMap.put(position.getEntryId(), newCount);
} finally {
rwLock.unlockWrite(stamp);
}
return newCount;
}

@Override
public int getRedeliveryCount(long ledgerId, long entryId) {
LongPair count = trackerCache.get(ledgerId, entryId);
return (int) (count != null ? count.first : 0);
long stamp = rwLock.tryOptimisticRead();
Long2IntMap entryMap = trackerCache.get(ledgerId);
int count = entryMap != null ? entryMap.get(entryId) : 0;
if (!rwLock.validate(stamp)) {
stamp = rwLock.readLock();
try {
entryMap = trackerCache.get(ledgerId);
count = entryMap != null ? entryMap.get(entryId) : 0;
} finally {
rwLock.unlockRead(stamp);
}
}
return count;
}

@Override
public void remove(Position position) {
Position positionImpl = position;
trackerCache.remove(positionImpl.getLedgerId(), positionImpl.getEntryId());
long stamp = rwLock.writeLock();
try {
Long2IntMap entryMap = trackerCache.get(position.getLedgerId());
if (entryMap != null) {
entryMap.remove(position.getEntryId());
if (entryMap.isEmpty()) {
trackerCache.remove(position.getLedgerId());
}
}
} finally {
rwLock.unlockWrite(stamp);
}
}

@Override
public void removeBatch(List<Position> positions) {
if (positions != null) {
positions.forEach(this::remove);
if (positions == null) {
return;
}
long stamp = rwLock.writeLock();
try {
for (Position position : positions) {
Long2IntMap entryMap = trackerCache.get(position.getLedgerId());
if (entryMap != null) {
entryMap.remove(position.getEntryId());
if (entryMap.isEmpty()) {
trackerCache.remove(position.getLedgerId());
}
}
}
} finally {
rwLock.unlockWrite(stamp);
}
}

@Override
public void clear() {
trackerCache.clear();
long stamp = rwLock.writeLock();
try {
trackerCache.clear();
} finally {
rwLock.unlockWrite(stamp);
}
}
}

0 comments on commit 6b694f6

Please sign in to comment.