Skip to content

Commit

Permalink
Add EntryList.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Dec 26, 2024
1 parent 003c291 commit 54dc115
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -50,66 +51,82 @@
* An abstract implementation of {@link LogAppender}.
*/
public abstract class LogAppenderBase implements LogAppender {
/** For storing log entries to create an {@link AppendEntriesRequestProto}. */
private class EntryBuffer {
/** A queue for limiting the byte size and element size. */
/** For buffering log entries to create an {@link EntryList}. */
private static class EntryBuffer {
/** A queue for limiting the byte size, number of elements and poll time. */
private final DataQueue<EntryWithData> queue;
/** A map for releasing {@link ReferenceCountedObject}s. */
private final Map<Long, ReferenceCountedObject<EntryWithData>> map = new HashMap<>();
private final Map<Long, ReferenceCountedObject<EntryWithData>> references = new HashMap<>();

EntryBuffer() {
final RaftProperties properties = server.getRaftServer().getProperties();
EntryBuffer(Object name, RaftProperties properties) {
final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
this.queue = new DataQueue<>(name, bufferByteLimit, bufferElementLimit, EntryWithData::getSerializedSize);
}

void retain() {
for (ReferenceCountedObject<EntryWithData> ref : map.values()) {
ref.retain();
}
}

void release() {
for (ReferenceCountedObject<EntryWithData> ref : map.values()) {
ref.release();
}
}

int size() {
return map.size();
}

boolean putNew(long index, ReferenceCountedObject<EntryWithData> ref) {
if (!queue.offer(ref.get())) {
ref.release();
boolean putNew(long index, ReferenceCountedObject<EntryWithData> retained) {
if (!queue.offer(retained.get())) {
retained.release();
return false;
}
final ReferenceCountedObject<EntryWithData> previous = map.put(index, ref);
final ReferenceCountedObject<EntryWithData> previous = references.put(index, retained);
Preconditions.assertNull(previous, () -> "previous with index " + index);
return true;
}

void releaseAndClear() {
release();
map.clear();
void releaseAllAndClear() {
for (ReferenceCountedObject<EntryWithData> ref : references.values()) {
ref.release();
}
references.clear();
queue.clear();
}

List<LogEntryProto> pollList(long heartbeatWaitTimeMs) throws RaftLogIOException {
EntryList pollList(long heartbeatWaitTimeMs) throws RaftLogIOException {
final List<LogEntryProto> protos;
try {
return queue.pollList(heartbeatWaitTimeMs, EntryWithData::getEntry, null);
} catch (RaftLogIOException e) {
releaseAndClear();
protos = queue.pollList(heartbeatWaitTimeMs, EntryWithData::getEntry, null);
} catch (Exception e) {
releaseAllAndClear();
throw e;
} finally {
for (EntryWithData entry : queue) {
// Release remaining entries.
final ReferenceCountedObject<EntryWithData> removed = map.remove(entry.getIndex());
// Remove and release remaining entries.
final ReferenceCountedObject<EntryWithData> removed = references.remove(entry.getIndex());
Objects.requireNonNull(removed, "removed == null");
removed.release();
}
queue.clear();
}
return new EntryList(protos, references);
}
}

/** Storing log entries and their references. */
private static class EntryList {
private final List<LogEntryProto> protos;
private final Collection<ReferenceCountedObject<EntryWithData>> references;

EntryList(List<LogEntryProto> protos, Map<Long, ReferenceCountedObject<EntryWithData>> references) {
Preconditions.assertSame(references.size(), protos.size(), "#entries");
this.protos = Collections.unmodifiableList(protos);
this.references = Collections.unmodifiableCollection(references.values());
}

List<LogEntryProto> getProtos() {
return protos;
}

void retain() {
for (ReferenceCountedObject<EntryWithData> ref : references) {
ref.retain();
}
}

void release() {
for (ReferenceCountedObject<EntryWithData> ref : references) {
ref.release();
}
}
}

Expand Down Expand Up @@ -293,8 +310,8 @@ public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean he
* @param heartbeat the returned request must be a heartbeat.
*
* @return a retained reference of {@link AppendEntriesRequestProto} object.
* Since the returned reference is retained, the caller must call {@link ReferenceCountedObject#release()}}
* after use.
* Since the returned reference is retained,
* the caller must call {@link ReferenceCountedObject#release()}} after use.
*/
protected ReferenceCountedObject<AppendEntriesRequestProto> nextAppendEntriesRequest(long callId, boolean heartbeat)
throws RaftLogIOException {
Expand All @@ -317,17 +334,12 @@ protected ReferenceCountedObject<AppendEntriesRequestProto> nextAppendEntriesReq
return null;
}

final List<LogEntryProto> protos = entryBuffer.pollList(heartbeatWaitTimeMs);
Preconditions.assertSame(entryBuffer.size(), protos.size(), "#protos");
final EntryList entryList = entryBuffer.pollList(heartbeatWaitTimeMs);
final List<LogEntryProto> protos = entryList.getProtos();
assertProtos(protos, followerNext, previous, snapshotIndex);
AppendEntriesRequestProto appendEntriesProto =
leaderState.newAppendEntriesRequestProto(follower, protos, previous, callId);

final ReferenceCountedObject<AppendEntriesRequestProto> ref = ReferenceCountedObject.wrap(
appendEntriesProto, entryBuffer::retain, entryBuffer::release);
ref.retain();
entryBuffer.release();
return ref;
return ReferenceCountedObject.wrap(appendEntriesProto, entryList::retain, entryList::release);
}

private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex previous, long snapshotIndex) {
Expand Down Expand Up @@ -355,18 +367,18 @@ private EntryBuffer readLogEntries(long followerNext, long heartbeatWaitTimeMs)
final long halfMs = heartbeatWaitTimeMs/2;
EntryBuffer entryBuffer = null;
for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
final ReferenceCountedObject<EntryWithData> ref;
final ReferenceCountedObject<EntryWithData> retained;
try {
ref = raftLog.retainEntryWithData(next);
retained = raftLog.retainEntryWithData(next);
if (entryBuffer == null) {
entryBuffer = new EntryBuffer();
entryBuffer = new EntryBuffer(name, server.getRaftServer().getProperties());
}
if (!entryBuffer.putNew(next, ref)) {
if (!entryBuffer.putNew(next, retained)) {
break;
}
} catch (Exception e){
} catch (Exception e) {
if (entryBuffer != null) {
entryBuffer.releaseAndClear();
entryBuffer.releaseAllAndClear();
}
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ ReferenceCountedObject<LogEntryProto> getEntryFromCache(TermIndex ti) {
* Acquire LogSegment's monitor so that there is no concurrent loading.
*/
synchronized ReferenceCountedObject<LogEntryProto> loadCache(TermIndex ti) throws RaftLogIOException {
ReferenceCountedObject<LogEntryProto> entry = entryCache.get(ti);
final ReferenceCountedObject<LogEntryProto> entry = entryCache.get(ti);
if (entry != null) {
try {
entry.retain();
Expand Down

0 comments on commit 54dc115

Please sign in to comment.