Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RaftActor shouldn't delete committed or non-conflict entries #151

Merged
merged 18 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# It's OK to exclude the following since protobuf.msg.AppendedEntries is not intend to use by users.
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.<init>$default$4")
ProblemFilters.exclude[IncompatibleSignatureProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.unapply")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.apply$default$4")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.of")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.PREV_LOG_INDEX_FIELD_NUMBER")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.prevLogIndex")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.withPrevLogIndex")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.copy$default$3")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.copy$default$4")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.AppendedEntries.this")

# It's safe to exclude the following since RaftActor#AppendedEntries is package-private.
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.raft.RaftActor#AppendedEntries.prevLogIndex")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.raft.RaftActor#AppendedEntries.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.raft.RaftActor#AppendedEntries.copy$default$3")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.raft.RaftActor#AppendedEntries.this")
ProblemFilters.exclude[MissingTypesProblem]("lerna.akka.entityreplication.raft.RaftActor$AppendedEntries$")
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.raft.RaftActor#AppendedEntries.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("lerna.akka.entityreplication.raft.RaftActor#AppendedEntries.unapply")
5 changes: 5 additions & 0 deletions src/main/protobuf/cluster_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ message DetectedNewTerm {
message AppendedEntries {
required Term term = 1;
repeated LogEntry log_entries = 2;
}

message AppendedEntries_V2_1_0 {
negokaz marked this conversation as resolved.
Show resolved Hide resolved
required Term term = 1;
repeated LogEntry log_entries = 2;
required LogEntryIndex prev_log_index = 3;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
// Manifests
// make them public for testing purposes
// raft
val BegunNewTermManifest = "AA"
val VotedManifest = "AB"
val DetectedNewTermManifest = "AC"
val AppendedEntriesManifest = "AD"
val AppendedEventManifest = "AE"
val CompactionCompletedManifest = "AF"
val SnapshotSyncCompletedManifest = "AG"
val PersistentStateManifest = "AH"
val CommandManifest = "AI"
val ForwardedCommandManifest = "AJ"
val SnapshotSyncStartedManifest = "AK"
val BegunNewTermManifest = "AA"
val VotedManifest = "AB"
val DetectedNewTermManifest = "AC"
val AppendedEntriesManifest_V2_1_0 = "AD"
val AppendedEventManifest = "AE"
val CompactionCompletedManifest = "AF"
val SnapshotSyncCompletedManifest = "AG"
val PersistentStateManifest = "AH"
val CommandManifest = "AI"
val ForwardedCommandManifest = "AJ"
val SnapshotSyncStartedManifest = "AK"
val AppendedEntriesManifest = "AL"
// raft.eventsourced
val CommitLogStoreInternalEventManifest = "BA"
val CommitLogStoreSaveManifest = "BB"
Expand Down Expand Up @@ -65,17 +66,18 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
// Manifest -> fromBinary
private val fromBinaryMap = HashMap[String, Array[Byte] => ClusterReplicationSerializable](
// raft
BegunNewTermManifest -> begunNewTermFromBinary,
VotedManifest -> votedFromBinary,
DetectedNewTermManifest -> detectedNewTermFromBinary,
AppendedEntriesManifest -> appendedEntriesFromBinary,
AppendedEventManifest -> appendedEventFromBinary,
CompactionCompletedManifest -> compactionCompletedFromBinary,
SnapshotSyncStartedManifest -> snapshotSyncStartedFromBinary,
SnapshotSyncCompletedManifest -> snapshotSyncCompletedFromBinary,
PersistentStateManifest -> persistentStateFromBinary,
CommandManifest -> commandFromBinary,
ForwardedCommandManifest -> forwardedCommandFromBinary,
BegunNewTermManifest -> begunNewTermFromBinary,
VotedManifest -> votedFromBinary,
DetectedNewTermManifest -> detectedNewTermFromBinary,
AppendedEntriesManifest -> appendedEntriesFromBinary,
AppendedEntriesManifest_V2_1_0 -> appendedEntries_V2_1_0_FromBinary,
AppendedEventManifest -> appendedEventFromBinary,
CompactionCompletedManifest -> compactionCompletedFromBinary,
SnapshotSyncStartedManifest -> snapshotSyncStartedFromBinary,
SnapshotSyncCompletedManifest -> snapshotSyncCompletedFromBinary,
PersistentStateManifest -> persistentStateFromBinary,
CommandManifest -> commandFromBinary,
ForwardedCommandManifest -> forwardedCommandFromBinary,
// raft.eventsourced
CommitLogStoreInternalEventManifest -> commitLogStoreInternalEventFromBinary,
CommitLogStoreSaveManifest -> commitLogStoreSaveFromBinary,
Expand Down Expand Up @@ -142,6 +144,7 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case _: raft.RaftActor.Voted => VotedManifest
case _: raft.RaftActor.DetectedNewTerm => DetectedNewTermManifest
case _: raft.RaftActor.AppendedEntries => AppendedEntriesManifest
case _: raft.RaftActor.AppendedEntries_V2_1_0 => AppendedEntriesManifest_V2_1_0
case _: raft.RaftActor.AppendedEvent => AppendedEventManifest
case _: raft.RaftActor.CompactionCompleted => CompactionCompletedManifest
case _: raft.RaftActor.SnapshotSyncStarted => SnapshotSyncStartedManifest
Expand Down Expand Up @@ -189,6 +192,7 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case m: raft.RaftActor.Voted => votedToBinary(m)
case m: raft.RaftActor.DetectedNewTerm => detectedNewTermToBinary(m)
case m: raft.RaftActor.AppendedEntries => appendedEntriesToBinary(m)
case m: raft.RaftActor.AppendedEntries_V2_1_0 => appendedEntries_V2_1_0_ToBinary(m)
case m: raft.RaftActor.AppendedEvent => appendedEventToBinary(m)
case m: raft.RaftActor.CompactionCompleted => compactionCompletedToBinary(m)
case m: raft.RaftActor.SnapshotSyncStarted => snapshotSyncStartedToBinary(m)
Expand Down Expand Up @@ -283,7 +287,6 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
.of(
term = termToProto(message.term),
logEntries = message.logEntries.map(logEntryToProto),
prevLogIndex = logEntryIndexToProto(message.prevLogIndex),
).toByteArray
}

Expand All @@ -292,6 +295,25 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
raft.RaftActor.AppendedEntries(
term = termFromProto(proto.term),
logEntries = proto.logEntries.map(logEntryFromProto),
)
}

@nowarn("msg=Use RaftActor.AppendedEntries instead.")
private def appendedEntries_V2_1_0_ToBinary(message: raft.RaftActor.AppendedEntries_V2_1_0): Array[Byte] = {
msg.AppendedEntries_V2_1_0
.of(
term = termToProto(message.term),
logEntries = message.logEntries.map(logEntryToProto),
prevLogIndex = logEntryIndexToProto(message.prevLogIndex),
).toByteArray
}

@nowarn("msg=Use RaftActor.AppendedEntries instead.")
private def appendedEntries_V2_1_0_FromBinary(bytes: Array[Byte]): raft.RaftActor.AppendedEntries_V2_1_0 = {
val proto = msg.AppendedEntries_V2_1_0.parseFrom(bytes)
raft.RaftActor.AppendedEntries_V2_1_0(
term = termFromProto(proto.term),
logEntries = proto.logEntries.map(logEntryFromProto),
prevLogIndex = logEntryIndexFromProto(proto.prevLogIndex),
)
}
Expand Down
20 changes: 10 additions & 10 deletions src/main/scala/lerna/akka/entityreplication/raft/Candidate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,16 @@ private[raft] trait Candidate { this: RaftActor =>
become(Follower)
}
} else {
applyDomainEvent(AppendedEntries(appendEntries.term, appendEntries.entries, appendEntries.prevLogIndex)) {
domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
val newEntries = currentData.resolveNewLogEntries(appendEntries.entries)
applyDomainEvent(AppendedEntries(appendEntries.term, newEntries)) { domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
}
}
} else { // prevLogIndex と prevLogTerm がマッチするエントリが無かった
Expand Down
20 changes: 10 additions & 10 deletions src/main/scala/lerna/akka/entityreplication/raft/Follower.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ private[raft] trait Follower { this: RaftActor =>
become(Follower)
}
} else {
applyDomainEvent(AppendedEntries(appendEntries.term, appendEntries.entries, appendEntries.prevLogIndex)) {
domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
val newEntries = currentData.resolveNewLogEntries(appendEntries.entries)
applyDomainEvent(AppendedEntries(appendEntries.term, newEntries)) { domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
}
}
} else { // prevLogIndex と prevLogTerm がマッチするエントリが無かった
Expand Down
20 changes: 10 additions & 10 deletions src/main/scala/lerna/akka/entityreplication/raft/Leader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,16 @@ private[raft] trait Leader { this: RaftActor =>
if (currentData.hasMatchLogEntry(appendEntries.prevLogIndex, appendEntries.prevLogTerm)) {
cancelHeartbeatTimeoutTimer()
if (log.isDebugEnabled) log.debug("=== [Leader] append {} ===", appendEntries)
applyDomainEvent(AppendedEntries(appendEntries.term, appendEntries.entries, appendEntries.prevLogIndex)) {
domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
val newEntries = currentData.resolveNewLogEntries(appendEntries.entries)
applyDomainEvent(AppendedEntries(appendEntries.term, newEntries)) { domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
}
} else { // prevLogIndex と prevLogTerm がマッチするエントリが無かった
if (log.isDebugEnabled) log.debug("=== [Leader] could not append {} ===", appendEntries)
Expand Down
26 changes: 24 additions & 2 deletions src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager
import lerna.akka.entityreplication.util.ActorIds
import lerna.akka.entityreplication.{ ClusterReplicationSerializable, ReplicationActorContext, ReplicationRegion }

import scala.annotation.nowarn

private[entityreplication] object RaftActor {

def props(
Expand Down Expand Up @@ -79,9 +81,24 @@ private[entityreplication] object RaftActor {
final case class BegunNewTerm(term: Term) extends PersistEvent with ClusterReplicationSerializable
final case class Voted(term: Term, candidate: MemberIndex) extends PersistEvent with ClusterReplicationSerializable
final case class DetectedNewTerm(term: Term) extends PersistEvent with ClusterReplicationSerializable
final case class AppendedEntries(term: Term, logEntries: Seq[LogEntry], prevLogIndex: LogEntryIndex)

/** AppendedEntries event contains a term (possibly a new term) and log entries
*
* The log entries include no existing entries on this RaftActor's raft log. RaftActor should truncate
* its log entries if the entries conflict with its log, and then append the entries.
*
* Note that the index of the entries MUST be continuously increasing (not checked at instantiating an instance of this class)
*/
final case class AppendedEntries(term: Term, logEntries: Seq[LogEntry])
extends PersistEvent
with ClusterReplicationSerializable

@deprecated("Use RaftActor.AppendedEntries instead.", "2.1.1")
/** This class is for backward compatibility */
final case class AppendedEntries_V2_1_0(term: Term, logEntries: Seq[LogEntry], prevLogIndex: LogEntryIndex)
extends PersistEvent
with ClusterReplicationSerializable

final case class AppendedEvent(event: EntityEvent) extends PersistEvent with ClusterReplicationSerializable
final case class CompactionCompleted(
memberIndex: MemberIndex,
Expand Down Expand Up @@ -199,6 +216,7 @@ private[raft] class RaftActor(

val numberOfMembers: Int = settings.replicationFactor

@nowarn("msg=Use RaftMemberData.truncateAndAppendEntries instead.")
protected def updateState(domainEvent: DomainEvent): RaftMemberData =
domainEvent match {
case BegunNewTerm(term) =>
Expand All @@ -207,7 +225,11 @@ private[raft] class RaftActor(
currentData.vote(candidate, term)
case DetectedNewTerm(term) =>
currentData.syncTerm(term)
case AppendedEntries(term, logEntries, prevLogIndex) =>
case AppendedEntries(term, logEntries) =>
currentData
.syncTerm(term)
.truncateAndAppendEntries(logEntries)
case AppendedEntries_V2_1_0(term, logEntries, prevLogIndex) =>
currentData
.syncTerm(term)
.appendEntries(logEntries, prevLogIndex)
Expand Down
Loading