Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip unnecessary snapshot synchronization #168

Merged
merged 2 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- A RaftAcotor(Leader) could mis-deliver a ReplicationSucceeded message to a different entity
[156](https://github.com/lerna-stack/akka-entity-replication/issues/156),
[#PR162](https://github.com/lerna-stack/akka-entity-replication/pull/162)
- Snapshot synchronization could remove committed log entries that not be included in snapshots
[#167](https://github.com/lerna-stack/akka-entity-replication/issues/167)
[#PR168](https://github.com/lerna-stack/akka-entity-replication/pull/168)

## [v2.1.0] - 2022-03-24
[v2.1.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.0.0...v2.1.0
Expand Down
37 changes: 26 additions & 11 deletions src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -654,22 +654,22 @@ private[raft] class RaftActor(
become(Follower)
}
}
case installSnapshot =>
case installSnapshot => {
if (installSnapshot.term == currentData.currentTerm) {
applyDomainEvent(DetectedLeaderMember(installSnapshot.srcMemberIndex)) { _ =>
applyDomainEvent(
SnapshotSyncStarted(
installSnapshot.srcLatestSnapshotLastLogTerm,
installSnapshot.srcLatestSnapshotLastLogLogIndex,
),
) { _ =>
startSyncSnapshot(installSnapshot)
become(Follower)
}
attemptToStartSnapshotSync()
}
} else {
applyDomainEvent(DetectedNewTerm(installSnapshot.term)) { _ =>
applyDomainEvent(DetectedLeaderMember(installSnapshot.srcMemberIndex)) { _ =>
attemptToStartSnapshotSync()
}
}
}
def attemptToStartSnapshotSync(): Unit = {
import RaftMemberData.SnapshotSynchronizationDecision
currentData.decideSnapshotSync(installSnapshot) match {
case SnapshotSynchronizationDecision.StartDecision =>
applyDomainEvent(
SnapshotSyncStarted(
installSnapshot.srcLatestSnapshotLastLogTerm,
Expand All @@ -679,9 +679,24 @@ private[raft] class RaftActor(
startSyncSnapshot(installSnapshot)
become(Follower)
}
}
case SnapshotSynchronizationDecision.SkipDecision(matchIndex) =>
val replyMessage = InstallSnapshotSucceeded(shardId, currentData.currentTerm, matchIndex, selfMemberIndex)
if (log.isDebugEnabled) {
log.debug(
"=== [{}] skipped snapshot synchronization for [{}] and replying with [{}]",
currentState,
installSnapshot,
replyMessage,
)
}
region ! ReplicationRegion.DeliverTo(installSnapshot.srcMemberIndex, replyMessage)
become(Follower)
case SnapshotSynchronizationDecision.ErrorDecision(reason) =>
log.error("[{}] ignored [{}]. reason: {}", currentState, installSnapshot, reason)
become(Follower)
}
}
}
}

protected def receiveSyncSnapshotResponse(response: SnapshotSyncManager.Response): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lerna.akka.entityreplication.raft
import lerna.akka.entityreplication.ClusterReplicationSerializable
import lerna.akka.entityreplication.model.NormalizedEntityId
import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.protocol.RaftCommands.InstallSnapshot
import lerna.akka.entityreplication.raft.routing.MemberIndex
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol.EntitySnapshotMetadata
import org.slf4j.{ Logger, LoggerFactory }
Expand Down Expand Up @@ -458,6 +459,31 @@ private[entityreplication] object RaftMemberData {
) extends CommittedEntriesForEventSourcingResolveError
}

/** Decision [[RaftActor]] should take about Snapshot Synchronization
*
* @see [[RaftMemberData.decideSnapshotSync]]
*/
sealed trait SnapshotSynchronizationDecision
object SnapshotSynchronizationDecision {

/** [[RaftActor]] should start Snapshot Synchronization */
case object StartDecision extends SnapshotSynchronizationDecision

/** [[RaftActor]] should skip Snapshot Synchronization
*
* RaftActor already has snapshots (or log entries) that Snapshot Synchronization will install. The leader can
* update its match index of this RaftActor to `matchIndex`.
*/
final case class SkipDecision(matchIndex: LogEntryIndex) extends SnapshotSynchronizationDecision

/** [[RaftActor]] should report an error (by such as logging)
*
* RaftActor cannot continue and recover by itself.
*/
final case class ErrorDecision(reason: String) extends SnapshotSynchronizationDecision

}

}

private[entityreplication] trait RaftMemberData
Expand Down Expand Up @@ -594,6 +620,58 @@ private[entityreplication] trait RaftMemberData
)
}

/** Decides what action RaftActor should take from the given InstallSnapshot
*
* Throws an [[IllegalArgumentException]] If `InstallSnapshot.term` is not equal to [[currentTerm]].
*/
def decideSnapshotSync(installSnapshot: InstallSnapshot): RaftMemberData.SnapshotSynchronizationDecision = {
require(
installSnapshot.term == currentTerm,
s"InstallSnapshot.term [${installSnapshot.term.term}] should be equal to currentTerm [${currentTerm.term}]",
)
replicatedLog.termAt(installSnapshot.srcLatestSnapshotLastLogLogIndex) match {
case Some(logTerm) =>
assert(installSnapshot.srcLatestSnapshotLastLogLogIndex >= replicatedLog.ancestorLastIndex)
assert(installSnapshot.srcLatestSnapshotLastLogLogIndex <= replicatedLog.lastLogIndex)
if (installSnapshot.srcLatestSnapshotLastLogTerm == logTerm) {
val matchIndex = Ordering[LogEntryIndex].max(installSnapshot.srcLatestSnapshotLastLogLogIndex, commitIndex)
RaftMemberData.SnapshotSynchronizationDecision.SkipDecision(matchIndex)
} else {
// NOTE: commitIndex can be less than or equal to replicated.ancestorLastIndex
// (for example, immediately after RaftActor started)
if (installSnapshot.srcLatestSnapshotLastLogLogIndex == replicatedLog.ancestorLastIndex) {
RaftMemberData.SnapshotSynchronizationDecision.ErrorDecision(
s"[$installSnapshot] conflicted with a compacted (committed) entry of ReplicatedLog(" +
s"ancestorLastTerm=[${replicatedLog.ancestorLastTerm.term}], " +
s"ancestorLastIndex=[${replicatedLog.ancestorLastIndex}])",
)
} else if (installSnapshot.srcLatestSnapshotLastLogLogIndex <= commitIndex) {
RaftMemberData.SnapshotSynchronizationDecision.ErrorDecision(
s"[$installSnapshot] conflicted with a committed entry (term=[${logTerm.term}]). commitIndex=[${commitIndex}]",
)
} else {
RaftMemberData.SnapshotSynchronizationDecision.StartDecision
}
}
case None =>
if (installSnapshot.srcLatestSnapshotLastLogLogIndex > replicatedLog.lastLogIndex) {
RaftMemberData.SnapshotSynchronizationDecision.StartDecision
} else {
assert(installSnapshot.srcLatestSnapshotLastLogLogIndex < replicatedLog.ancestorLastIndex)
if (installSnapshot.srcLatestSnapshotLastLogTerm > replicatedLog.ancestorLastTerm) {
RaftMemberData.SnapshotSynchronizationDecision.ErrorDecision(
s"[$installSnapshot] conflicted with a compacted (committed) entry of ReplicatedLog(" +
s"ancestorLastTerm=[${replicatedLog.ancestorLastTerm.term}], ancestorLastIndex=[${replicatedLog.ancestorLastIndex}]) " +
"since terms only increase",
)
} else {
val matchIndex = Ordering[LogEntryIndex].max(replicatedLog.ancestorLastIndex, commitIndex)
RaftMemberData.SnapshotSynchronizationDecision.SkipDecision(matchIndex)
}
}
}
}

def startSnapshotSync(snapshotLastLogTerm: Term, snapshotLastLogIndex: LogEntryIndex): RaftMemberData = {
updatePersistentState(
lastSnapshotStatus = lastSnapshotStatus.startSnapshotSync(snapshotLastLogTerm, snapshotLastLogIndex),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,4 +533,89 @@ class RaftActorSnapshotSynchronizationSpec
}
}
}

"RaftActor" should {

"skip snapshot synchronization and reply with an InstallSnapshotSucceeded message " +
"if it receives an InstallSnapshot message that installs no new snapshots" in {
val regionProbe = TestProbe()
val shardId = createUniqueShardId()
val followerMemberIndex = createUniqueMemberIndex()
val follower = createRaftActor(
region = regionProbe.ref,
shardId = shardId,
selfMemberIndex = followerMemberIndex,
)
val followerData = {
val log = ReplicatedLog()
.reset(Term(1), LogEntryIndex(6))
.truncateAndAppend(
Seq(
LogEntry(LogEntryIndex(7), EntityEvent(Option(NormalizedEntityId.from("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(8), EntityEvent(None, NoOp), Term(2)),
LogEntry(LogEntryIndex(9), EntityEvent(Option(NormalizedEntityId.from("entity-2")), "event-b"), Term(2)),
),
)
RaftMemberData(currentTerm = Term(2), replicatedLog = log, commitIndex = LogEntryIndex(8))
}
setState(follower, Follower, followerData)

val leaderMemberIndex = createUniqueMemberIndex()
val installSnapshot = InstallSnapshot(
shardId,
Term(2),
leaderMemberIndex,
srcLatestSnapshotLastLogTerm = Term(1),
srcLatestSnapshotLastLogLogIndex = LogEntryIndex(7),
)
LoggingTestKit.debug(s"skipped snapshot synchronization for [${installSnapshot}]").expect {
follower ! installSnapshot
}
val replyMessage = regionProbe.fishForSpecificMessage[InstallSnapshotSucceeded]() {
case ReplicationRegion.DeliverTo(`leaderMemberIndex`, replyMessage: InstallSnapshotSucceeded) => replyMessage
}
replyMessage.term should be(Term(2))
replyMessage.shardId should be(shardId)
replyMessage.sender should be(followerMemberIndex)
replyMessage.dstLatestSnapshotLastLogLogIndex should be(LogEntryIndex(8))
}

"log an error if it receives an InstallSnapshot message inconsistent with its state" in {
val regionProbe = TestProbe()
val shardId = createUniqueShardId()
val followerMemberIndex = createUniqueMemberIndex()
val follower = createRaftActor(
shardId = shardId,
selfMemberIndex = followerMemberIndex,
region = regionProbe.ref,
)
val followerData = {
val log = ReplicatedLog()
.reset(Term(1), LogEntryIndex(6))
.truncateAndAppend(
Seq(
LogEntry(LogEntryIndex(7), EntityEvent(Option(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(8), EntityEvent(None, NoOp), Term(2)),
LogEntry(LogEntryIndex(9), EntityEvent(Option(NormalizedEntityId("entity-2")), "event-b"), Term(2)),
),
)
RaftMemberData(currentTerm = Term(2), replicatedLog = log, commitIndex = LogEntryIndex(8))
}
setState(follower, Follower, followerData)

val leaderMemberIndex = createUniqueMemberIndex()
val installSnapshot = InstallSnapshot(
shardId,
Term(2),
leaderMemberIndex,
srcLatestSnapshotLastLogTerm = Term(1),
srcLatestSnapshotLastLogLogIndex = LogEntryIndex(8),
)
LoggingTestKit.error(s"ignored [${installSnapshot}]").expect {
follower ! installSnapshot
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -393,29 +393,27 @@ class RaftActorSpec
)

val leaderMemberIndex = createUniqueMemberIndex()
val term = Term.initial().next()
val entityId = NormalizedEntityId.from("test-entity")
val logEntries = Seq(
LogEntry(LogEntryIndex(1), EntityEvent(Option(entityId), "a"), term),
LogEntry(LogEntryIndex(2), EntityEvent(Option(entityId), "b"), term),
LogEntry(LogEntryIndex(3), EntityEvent(Option(entityId), "c"), term),
LogEntry(LogEntryIndex(4), EntityEvent(Option(entityId), "d"), term),
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Option(entityId), "b"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Option(entityId), "c"), Term(1)),
LogEntry(LogEntryIndex(4), EntityEvent(Option(entityId), "d"), Term(1)),
)
val applicableIndex = LogEntryIndex(3)
val installSnapshot =
InstallSnapshot(
shardId,
term,
Term(3),
leaderMemberIndex,
srcLatestSnapshotLastLogTerm = term,
srcLatestSnapshotLastLogLogIndex = applicableIndex,
srcLatestSnapshotLastLogTerm = Term(2),
srcLatestSnapshotLastLogLogIndex = LogEntryIndex(5),
)
follower ! createAppendEntries(
shardId,
term,
Term(1),
leaderMemberIndex,
entries = logEntries,
leaderCommit = applicableIndex,
leaderCommit = LogEntryIndex(3),
)

// wait for starting compaction
Expand All @@ -432,8 +430,7 @@ class RaftActorSpec
snapshotStore.reply(SaveSnapshotSuccess(msg.snapshot.metadata))
} should have length 1
// compaction completed (snapshot synchronization become available)
LoggingTestKit.info("Snapshot synchronization already completed").expect {
// the snapshots has been already saved by compaction
LoggingTestKit.info("Snapshot synchronization started").expect {
follower ! installSnapshot
}
}
Expand Down
Loading