Skip to content

Commit

Permalink
RaftActor shouldn't delete committed or non-conflict entries
Browse files Browse the repository at this point in the history
  • Loading branch information
Taichi Yamakawa committed Apr 25, 2022
1 parent f77939c commit 38eaa66
Show file tree
Hide file tree
Showing 21 changed files with 380 additions and 190 deletions.
5 changes: 5 additions & 0 deletions src/main/protobuf/cluster_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ message DetectedNewTerm {
message AppendedEntries {
required Term term = 1;
repeated LogEntry log_entries = 2;
}

message AppendedEntries_V2_1_0 {
required Term term = 1;
repeated LogEntry log_entries = 2;
required LogEntryIndex prev_log_index = 3;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
// Manifests
// make them public for testing purposes
// raft
val BegunNewTermManifest = "AA"
val VotedManifest = "AB"
val DetectedNewTermManifest = "AC"
val AppendedEntriesManifest = "AD"
val AppendedEventManifest = "AE"
val CompactionCompletedManifest = "AF"
val SnapshotSyncCompletedManifest = "AG"
val PersistentStateManifest = "AH"
val CommandManifest = "AI"
val ForwardedCommandManifest = "AJ"
val SnapshotSyncStartedManifest = "AK"
val BegunNewTermManifest = "AA"
val VotedManifest = "AB"
val DetectedNewTermManifest = "AC"
val AppendedEntriesManifest_V2_1_0 = "AD"
val AppendedEventManifest = "AE"
val CompactionCompletedManifest = "AF"
val SnapshotSyncCompletedManifest = "AG"
val PersistentStateManifest = "AH"
val CommandManifest = "AI"
val ForwardedCommandManifest = "AJ"
val SnapshotSyncStartedManifest = "AK"
val AppendedEntriesManifest = "AL"
// raft.eventsourced
val CommitLogStoreInternalEventManifest = "BA"
val CommitLogStoreSaveManifest = "BB"
Expand Down Expand Up @@ -65,17 +66,18 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
// Manifest -> fromBinary
private val fromBinaryMap = HashMap[String, Array[Byte] => ClusterReplicationSerializable](
// raft
BegunNewTermManifest -> begunNewTermFromBinary,
VotedManifest -> votedFromBinary,
DetectedNewTermManifest -> detectedNewTermFromBinary,
AppendedEntriesManifest -> appendedEntriesFromBinary,
AppendedEventManifest -> appendedEventFromBinary,
CompactionCompletedManifest -> compactionCompletedFromBinary,
SnapshotSyncStartedManifest -> snapshotSyncStartedFromBinary,
SnapshotSyncCompletedManifest -> snapshotSyncCompletedFromBinary,
PersistentStateManifest -> persistentStateFromBinary,
CommandManifest -> commandFromBinary,
ForwardedCommandManifest -> forwardedCommandFromBinary,
BegunNewTermManifest -> begunNewTermFromBinary,
VotedManifest -> votedFromBinary,
DetectedNewTermManifest -> detectedNewTermFromBinary,
AppendedEntriesManifest -> appendedEntriesFromBinary,
AppendedEntriesManifest_V2_1_0 -> appendedEntries_V2_1_0_FromBinary,
AppendedEventManifest -> appendedEventFromBinary,
CompactionCompletedManifest -> compactionCompletedFromBinary,
SnapshotSyncStartedManifest -> snapshotSyncStartedFromBinary,
SnapshotSyncCompletedManifest -> snapshotSyncCompletedFromBinary,
PersistentStateManifest -> persistentStateFromBinary,
CommandManifest -> commandFromBinary,
ForwardedCommandManifest -> forwardedCommandFromBinary,
// raft.eventsourced
CommitLogStoreInternalEventManifest -> commitLogStoreInternalEventFromBinary,
CommitLogStoreSaveManifest -> commitLogStoreSaveFromBinary,
Expand Down Expand Up @@ -142,6 +144,7 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case _: raft.RaftActor.Voted => VotedManifest
case _: raft.RaftActor.DetectedNewTerm => DetectedNewTermManifest
case _: raft.RaftActor.AppendedEntries => AppendedEntriesManifest
case _: raft.RaftActor.AppendedEntries_V2_1_0 => AppendedEntriesManifest_V2_1_0
case _: raft.RaftActor.AppendedEvent => AppendedEventManifest
case _: raft.RaftActor.CompactionCompleted => CompactionCompletedManifest
case _: raft.RaftActor.SnapshotSyncStarted => SnapshotSyncStartedManifest
Expand Down Expand Up @@ -189,6 +192,7 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case m: raft.RaftActor.Voted => votedToBinary(m)
case m: raft.RaftActor.DetectedNewTerm => detectedNewTermToBinary(m)
case m: raft.RaftActor.AppendedEntries => appendedEntriesToBinary(m)
case m: raft.RaftActor.AppendedEntries_V2_1_0 => appendedEntries_V2_1_0_ToBinary(m)
case m: raft.RaftActor.AppendedEvent => appendedEventToBinary(m)
case m: raft.RaftActor.CompactionCompleted => compactionCompletedToBinary(m)
case m: raft.RaftActor.SnapshotSyncStarted => snapshotSyncStartedToBinary(m)
Expand Down Expand Up @@ -283,7 +287,6 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
.of(
term = termToProto(message.term),
logEntries = message.logEntries.map(logEntryToProto),
prevLogIndex = logEntryIndexToProto(message.prevLogIndex),
).toByteArray
}

Expand All @@ -292,6 +295,25 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
raft.RaftActor.AppendedEntries(
term = termFromProto(proto.term),
logEntries = proto.logEntries.map(logEntryFromProto),
)
}

@nowarn("msg=Use RaftActor.AppendedEntries instead.")
private def appendedEntries_V2_1_0_ToBinary(message: raft.RaftActor.AppendedEntries_V2_1_0): Array[Byte] = {
msg.AppendedEntries_V2_1_0
.of(
term = termToProto(message.term),
logEntries = message.logEntries.map(logEntryToProto),
prevLogIndex = logEntryIndexToProto(message.prevLogIndex),
).toByteArray
}

@nowarn("msg=Use RaftActor.AppendedEntries instead.")
private def appendedEntries_V2_1_0_FromBinary(bytes: Array[Byte]): raft.RaftActor.AppendedEntries_V2_1_0 = {
val proto = msg.AppendedEntries_V2_1_0.parseFrom(bytes)
raft.RaftActor.AppendedEntries_V2_1_0(
term = termFromProto(proto.term),
logEntries = proto.logEntries.map(logEntryFromProto),
prevLogIndex = logEntryIndexFromProto(proto.prevLogIndex),
)
}
Expand Down
20 changes: 10 additions & 10 deletions src/main/scala/lerna/akka/entityreplication/raft/Candidate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,16 @@ private[raft] trait Candidate { this: RaftActor =>
become(Follower)
}
} else {
applyDomainEvent(AppendedEntries(appendEntries.term, appendEntries.entries, appendEntries.prevLogIndex)) {
domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
val newEntries = currentData.resolveNewLogEntries(appendEntries.entries)
applyDomainEvent(AppendedEntries(appendEntries.term, newEntries)) { domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
}
}
} else { // prevLogIndex と prevLogTerm がマッチするエントリが無かった
Expand Down
20 changes: 10 additions & 10 deletions src/main/scala/lerna/akka/entityreplication/raft/Follower.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ private[raft] trait Follower { this: RaftActor =>
become(Follower)
}
} else {
applyDomainEvent(AppendedEntries(appendEntries.term, appendEntries.entries, appendEntries.prevLogIndex)) {
domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
val newEntries = currentData.resolveNewLogEntries(appendEntries.entries)
applyDomainEvent(AppendedEntries(appendEntries.term, newEntries)) { domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
}
}
} else { // prevLogIndex と prevLogTerm がマッチするエントリが無かった
Expand Down
20 changes: 10 additions & 10 deletions src/main/scala/lerna/akka/entityreplication/raft/Leader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,16 @@ private[raft] trait Leader { this: RaftActor =>
if (currentData.hasMatchLogEntry(appendEntries.prevLogIndex, appendEntries.prevLogTerm)) {
cancelHeartbeatTimeoutTimer()
if (log.isDebugEnabled) log.debug("=== [Leader] append {} ===", appendEntries)
applyDomainEvent(AppendedEntries(appendEntries.term, appendEntries.entries, appendEntries.prevLogIndex)) {
domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
val newEntries = currentData.resolveNewLogEntries(appendEntries.entries)
applyDomainEvent(AppendedEntries(appendEntries.term, newEntries)) { domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
}
} else { // prevLogIndex と prevLogTerm がマッチするエントリが無かった
if (log.isDebugEnabled) log.debug("=== [Leader] could not append {} ===", appendEntries)
Expand Down
26 changes: 24 additions & 2 deletions src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager
import lerna.akka.entityreplication.util.ActorIds
import lerna.akka.entityreplication.{ ClusterReplicationSerializable, ReplicationActorContext, ReplicationRegion }

import scala.annotation.nowarn

private[entityreplication] object RaftActor {

def props(
Expand Down Expand Up @@ -79,9 +81,24 @@ private[entityreplication] object RaftActor {
final case class BegunNewTerm(term: Term) extends PersistEvent with ClusterReplicationSerializable
final case class Voted(term: Term, candidate: MemberIndex) extends PersistEvent with ClusterReplicationSerializable
final case class DetectedNewTerm(term: Term) extends PersistEvent with ClusterReplicationSerializable
final case class AppendedEntries(term: Term, logEntries: Seq[LogEntry], prevLogIndex: LogEntryIndex)

/** AppendedEntries event contains a term (possibly a new term) and log entries
*
* The log entries include no existing entries on this RaftActor's raft log. RaftActor should truncate
* its log entries if the entries conflict with its log, and then append the entries.
*
* Note that the index of the entries MUST be continuously increasing (not checked at instantiating an instance of this class)
*/
final case class AppendedEntries(term: Term, logEntries: Seq[LogEntry])
extends PersistEvent
with ClusterReplicationSerializable

@deprecated("Use RaftActor.AppendedEntries instead.", "2.1.1")
/** This class is for backward compatibility */
final case class AppendedEntries_V2_1_0(term: Term, logEntries: Seq[LogEntry], prevLogIndex: LogEntryIndex)
extends PersistEvent
with ClusterReplicationSerializable

final case class AppendedEvent(event: EntityEvent) extends PersistEvent with ClusterReplicationSerializable
final case class CompactionCompleted(
memberIndex: MemberIndex,
Expand Down Expand Up @@ -199,6 +216,7 @@ private[raft] class RaftActor(

val numberOfMembers: Int = settings.replicationFactor

@nowarn("msg=Use RaftMemberData.truncateAndAppendEntries instead.")
protected def updateState(domainEvent: DomainEvent): RaftMemberData =
domainEvent match {
case BegunNewTerm(term) =>
Expand All @@ -207,7 +225,11 @@ private[raft] class RaftActor(
currentData.vote(candidate, term)
case DetectedNewTerm(term) =>
currentData.syncTerm(term)
case AppendedEntries(term, logEntries, prevLogIndex) =>
case AppendedEntries(term, logEntries) =>
currentData
.syncTerm(term)
.truncateAndAppendEntries(logEntries)
case AppendedEntries_V2_1_0(term, logEntries, prevLogIndex) =>
currentData
.syncTerm(term)
.appendEntries(logEntries, prevLogIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import lerna.akka.entityreplication.model.NormalizedEntityId
import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.routing.MemberIndex
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol.EntitySnapshotMetadata
import org.slf4j.{ Logger, LoggerFactory }

private[entityreplication] object PersistentStateData {

Expand Down Expand Up @@ -104,12 +105,78 @@ private[entityreplication] trait FollowerData { self: RaftMemberData =>
updatePersistentState(currentTerm = term, votedFor = Some(candidate))
}

/** Returns new entries that contain no existing entries
*
* While the given entries could conflict with the existing entries, this method finds such a conflict and returns entries
* beginning with the conflicting. [[ReplicatedLog.findConflict]] describes what conflicting means.
*
* The given entries should start with an index less than or equal to the last index of exising entries plus one.
* If this requirement breaks, this method throws an [[IllegalArgumentException]] since it will miss some entries.
*
* Note that the index of the given entries MUST be continuously increasing (not checked on this method)
*
* @see [[ReplicatedLog.findConflict]]
*/
def resolveNewLogEntries(logEntries: Seq[LogEntry]): Seq[LogEntry] = {
import ReplicatedLog.FindConflictResult
require(
logEntries.isEmpty || logEntries.head.index <= replicatedLog.lastLogIndex.plus(1),
s"The given non-empty log entries (indices: [${logEntries.head.index}..${logEntries.last.index}]) should start with an index " +
s"less than or equal to lastLogIndex[${replicatedLog.lastLogIndex}] + 1. " +
"If this requirement breaks, the raft log will miss some entries.",
)
replicatedLog.findConflict(logEntries) match {
case FindConflictResult.NoConflict =>
logEntries
.dropWhile(_.index <= replicatedLog.lastLogIndex)
.ensuring(
_.headOption.forall(_.index == replicatedLog.lastLogIndex.plus(1)),
s"The new entries (not-empty) should start with lastLogIndex ${replicatedLog.lastLogIndex} + 1.",
)
case FindConflictResult.ConflictFound(conflictIndex, conflictTerm) =>
if (log.isInfoEnabled) {
log.info(
"found conflict at index [{}] (existing term: [{}], conflicting term: [{}]).",
conflictIndex,
replicatedLog.termAt(conflictIndex),
conflictTerm,
)
}
assert(
conflictIndex > commitIndex,
s"The entry with index [$conflictIndex] should not conflict with the committed entry (commitIndex [$commitIndex])",
)
assert(
conflictIndex > replicatedLog.ancestorLastIndex,
s"The entry with index [$conflictIndex] should not conflict with the compacted entry (ancestorLastIndex [${replicatedLog.ancestorLastIndex}])",
)
assert(
conflictIndex <= replicatedLog.lastLogIndex,
s"The entry with index [$conflictIndex] should conflict with an exising entry, but didn't (lastLogIndex [${replicatedLog.lastLogIndex}])",
)
val newEntries = logEntries
.dropWhile(_.index < conflictIndex)
assert(
newEntries.nonEmpty && newEntries.head.index == conflictIndex,
s"The new entries (containing conflicts, size=[${newEntries.size}]) should always be non-empty and start with the conflict Index [$conflictIndex]",
)
newEntries
}
}

@deprecated("Use RaftMemberData.truncateAndAppendEntries instead.", "2.1.1")
def appendEntries(logEntries: Seq[LogEntry], prevLogIndex: LogEntryIndex): RaftMemberData = {
updatePersistentState(
replicatedLog = replicatedLog.merge(logEntries, prevLogIndex),
)
}

def truncateAndAppendEntries(logEntries: Seq[LogEntry]): RaftMemberData = {
updatePersistentState(
replicatedLog = replicatedLog.truncateAndAppend(logEntries),
)
}

def detectLeaderMember(leaderMember: MemberIndex): RaftMemberData = {
updateFollowerVolatileState(leaderMember = Some(leaderMember))
}
Expand Down Expand Up @@ -352,6 +419,8 @@ private[entityreplication] trait RaftMemberData
with LeaderData
with ShardData {

protected val log: Logger = LoggerFactory.getLogger(getClass)

protected def selectApplicableLogEntries: Seq[LogEntry] =
if (commitIndex > lastApplied) {
replicatedLog.sliceEntries(from = lastApplied.next(), to = commitIndex)
Expand Down
Loading

0 comments on commit 38eaa66

Please sign in to comment.