Skip to content

Commit

Permalink
Merge pull request #168 from lerna-stack/skip-unnecessary-snapshot-sy…
Browse files Browse the repository at this point in the history
…nchronization

Skip unnecessary snapshot synchronization
  • Loading branch information
negokaz authored Aug 4, 2022
2 parents fcc83f3 + d28640e commit 10c8ee7
Show file tree
Hide file tree
Showing 6 changed files with 575 additions and 25 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- A RaftAcotor(Leader) could mis-deliver a ReplicationSucceeded message to a different entity
[156](https://github.com/lerna-stack/akka-entity-replication/issues/156),
[#PR162](https://github.com/lerna-stack/akka-entity-replication/pull/162)
- Snapshot synchronization could remove committed log entries that not be included in snapshots
[#167](https://github.com/lerna-stack/akka-entity-replication/issues/167)
[#PR168](https://github.com/lerna-stack/akka-entity-replication/pull/168)

## [v2.1.0] - 2022-03-24
[v2.1.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.0.0...v2.1.0
Expand Down
37 changes: 26 additions & 11 deletions src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -654,22 +654,22 @@ private[raft] class RaftActor(
become(Follower)
}
}
case installSnapshot =>
case installSnapshot => {
if (installSnapshot.term == currentData.currentTerm) {
applyDomainEvent(DetectedLeaderMember(installSnapshot.srcMemberIndex)) { _ =>
applyDomainEvent(
SnapshotSyncStarted(
installSnapshot.srcLatestSnapshotLastLogTerm,
installSnapshot.srcLatestSnapshotLastLogLogIndex,
),
) { _ =>
startSyncSnapshot(installSnapshot)
become(Follower)
}
attemptToStartSnapshotSync()
}
} else {
applyDomainEvent(DetectedNewTerm(installSnapshot.term)) { _ =>
applyDomainEvent(DetectedLeaderMember(installSnapshot.srcMemberIndex)) { _ =>
attemptToStartSnapshotSync()
}
}
}
def attemptToStartSnapshotSync(): Unit = {
import RaftMemberData.SnapshotSynchronizationDecision
currentData.decideSnapshotSync(installSnapshot) match {
case SnapshotSynchronizationDecision.StartDecision =>
applyDomainEvent(
SnapshotSyncStarted(
installSnapshot.srcLatestSnapshotLastLogTerm,
Expand All @@ -679,9 +679,24 @@ private[raft] class RaftActor(
startSyncSnapshot(installSnapshot)
become(Follower)
}
}
case SnapshotSynchronizationDecision.SkipDecision(matchIndex) =>
val replyMessage = InstallSnapshotSucceeded(shardId, currentData.currentTerm, matchIndex, selfMemberIndex)
if (log.isDebugEnabled) {
log.debug(
"=== [{}] skipped snapshot synchronization for [{}] and replying with [{}]",
currentState,
installSnapshot,
replyMessage,
)
}
region ! ReplicationRegion.DeliverTo(installSnapshot.srcMemberIndex, replyMessage)
become(Follower)
case SnapshotSynchronizationDecision.ErrorDecision(reason) =>
log.error("[{}] ignored [{}]. reason: {}", currentState, installSnapshot, reason)
become(Follower)
}
}
}
}

protected def receiveSyncSnapshotResponse(response: SnapshotSyncManager.Response): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lerna.akka.entityreplication.raft
import lerna.akka.entityreplication.ClusterReplicationSerializable
import lerna.akka.entityreplication.model.NormalizedEntityId
import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.protocol.RaftCommands.InstallSnapshot
import lerna.akka.entityreplication.raft.routing.MemberIndex
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol.EntitySnapshotMetadata
import org.slf4j.{ Logger, LoggerFactory }
Expand Down Expand Up @@ -458,6 +459,31 @@ private[entityreplication] object RaftMemberData {
) extends CommittedEntriesForEventSourcingResolveError
}

/** Decision [[RaftActor]] should take about Snapshot Synchronization
*
* @see [[RaftMemberData.decideSnapshotSync]]
*/
sealed trait SnapshotSynchronizationDecision
object SnapshotSynchronizationDecision {

/** [[RaftActor]] should start Snapshot Synchronization */
case object StartDecision extends SnapshotSynchronizationDecision

/** [[RaftActor]] should skip Snapshot Synchronization
*
* RaftActor already has snapshots (or log entries) that Snapshot Synchronization will install. The leader can
* update its match index of this RaftActor to `matchIndex`.
*/
final case class SkipDecision(matchIndex: LogEntryIndex) extends SnapshotSynchronizationDecision

/** [[RaftActor]] should report an error (by such as logging)
*
* RaftActor cannot continue and recover by itself.
*/
final case class ErrorDecision(reason: String) extends SnapshotSynchronizationDecision

}

}

private[entityreplication] trait RaftMemberData
Expand Down Expand Up @@ -594,6 +620,58 @@ private[entityreplication] trait RaftMemberData
)
}

/** Decides what action RaftActor should take from the given InstallSnapshot
*
* Throws an [[IllegalArgumentException]] If `InstallSnapshot.term` is not equal to [[currentTerm]].
*/
def decideSnapshotSync(installSnapshot: InstallSnapshot): RaftMemberData.SnapshotSynchronizationDecision = {
require(
installSnapshot.term == currentTerm,
s"InstallSnapshot.term [${installSnapshot.term.term}] should be equal to currentTerm [${currentTerm.term}]",
)
replicatedLog.termAt(installSnapshot.srcLatestSnapshotLastLogLogIndex) match {
case Some(logTerm) =>
assert(installSnapshot.srcLatestSnapshotLastLogLogIndex >= replicatedLog.ancestorLastIndex)
assert(installSnapshot.srcLatestSnapshotLastLogLogIndex <= replicatedLog.lastLogIndex)
if (installSnapshot.srcLatestSnapshotLastLogTerm == logTerm) {
val matchIndex = Ordering[LogEntryIndex].max(installSnapshot.srcLatestSnapshotLastLogLogIndex, commitIndex)
RaftMemberData.SnapshotSynchronizationDecision.SkipDecision(matchIndex)
} else {
// NOTE: commitIndex can be less than or equal to replicated.ancestorLastIndex
// (for example, immediately after RaftActor started)
if (installSnapshot.srcLatestSnapshotLastLogLogIndex == replicatedLog.ancestorLastIndex) {
RaftMemberData.SnapshotSynchronizationDecision.ErrorDecision(
s"[$installSnapshot] conflicted with a compacted (committed) entry of ReplicatedLog(" +
s"ancestorLastTerm=[${replicatedLog.ancestorLastTerm.term}], " +
s"ancestorLastIndex=[${replicatedLog.ancestorLastIndex}])",
)
} else if (installSnapshot.srcLatestSnapshotLastLogLogIndex <= commitIndex) {
RaftMemberData.SnapshotSynchronizationDecision.ErrorDecision(
s"[$installSnapshot] conflicted with a committed entry (term=[${logTerm.term}]). commitIndex=[${commitIndex}]",
)
} else {
RaftMemberData.SnapshotSynchronizationDecision.StartDecision
}
}
case None =>
if (installSnapshot.srcLatestSnapshotLastLogLogIndex > replicatedLog.lastLogIndex) {
RaftMemberData.SnapshotSynchronizationDecision.StartDecision
} else {
assert(installSnapshot.srcLatestSnapshotLastLogLogIndex < replicatedLog.ancestorLastIndex)
if (installSnapshot.srcLatestSnapshotLastLogTerm > replicatedLog.ancestorLastTerm) {
RaftMemberData.SnapshotSynchronizationDecision.ErrorDecision(
s"[$installSnapshot] conflicted with a compacted (committed) entry of ReplicatedLog(" +
s"ancestorLastTerm=[${replicatedLog.ancestorLastTerm.term}], ancestorLastIndex=[${replicatedLog.ancestorLastIndex}]) " +
"since terms only increase",
)
} else {
val matchIndex = Ordering[LogEntryIndex].max(replicatedLog.ancestorLastIndex, commitIndex)
RaftMemberData.SnapshotSynchronizationDecision.SkipDecision(matchIndex)
}
}
}
}

def startSnapshotSync(snapshotLastLogTerm: Term, snapshotLastLogIndex: LogEntryIndex): RaftMemberData = {
updatePersistentState(
lastSnapshotStatus = lastSnapshotStatus.startSnapshotSync(snapshotLastLogTerm, snapshotLastLogIndex),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,4 +533,89 @@ class RaftActorSnapshotSynchronizationSpec
}
}
}

"RaftActor" should {

"skip snapshot synchronization and reply with an InstallSnapshotSucceeded message " +
"if it receives an InstallSnapshot message that installs no new snapshots" in {
val regionProbe = TestProbe()
val shardId = createUniqueShardId()
val followerMemberIndex = createUniqueMemberIndex()
val follower = createRaftActor(
region = regionProbe.ref,
shardId = shardId,
selfMemberIndex = followerMemberIndex,
)
val followerData = {
val log = ReplicatedLog()
.reset(Term(1), LogEntryIndex(6))
.truncateAndAppend(
Seq(
LogEntry(LogEntryIndex(7), EntityEvent(Option(NormalizedEntityId.from("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(8), EntityEvent(None, NoOp), Term(2)),
LogEntry(LogEntryIndex(9), EntityEvent(Option(NormalizedEntityId.from("entity-2")), "event-b"), Term(2)),
),
)
RaftMemberData(currentTerm = Term(2), replicatedLog = log, commitIndex = LogEntryIndex(8))
}
setState(follower, Follower, followerData)

val leaderMemberIndex = createUniqueMemberIndex()
val installSnapshot = InstallSnapshot(
shardId,
Term(2),
leaderMemberIndex,
srcLatestSnapshotLastLogTerm = Term(1),
srcLatestSnapshotLastLogLogIndex = LogEntryIndex(7),
)
LoggingTestKit.debug(s"skipped snapshot synchronization for [${installSnapshot}]").expect {
follower ! installSnapshot
}
val replyMessage = regionProbe.fishForSpecificMessage[InstallSnapshotSucceeded]() {
case ReplicationRegion.DeliverTo(`leaderMemberIndex`, replyMessage: InstallSnapshotSucceeded) => replyMessage
}
replyMessage.term should be(Term(2))
replyMessage.shardId should be(shardId)
replyMessage.sender should be(followerMemberIndex)
replyMessage.dstLatestSnapshotLastLogLogIndex should be(LogEntryIndex(8))
}

"log an error if it receives an InstallSnapshot message inconsistent with its state" in {
val regionProbe = TestProbe()
val shardId = createUniqueShardId()
val followerMemberIndex = createUniqueMemberIndex()
val follower = createRaftActor(
shardId = shardId,
selfMemberIndex = followerMemberIndex,
region = regionProbe.ref,
)
val followerData = {
val log = ReplicatedLog()
.reset(Term(1), LogEntryIndex(6))
.truncateAndAppend(
Seq(
LogEntry(LogEntryIndex(7), EntityEvent(Option(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(8), EntityEvent(None, NoOp), Term(2)),
LogEntry(LogEntryIndex(9), EntityEvent(Option(NormalizedEntityId("entity-2")), "event-b"), Term(2)),
),
)
RaftMemberData(currentTerm = Term(2), replicatedLog = log, commitIndex = LogEntryIndex(8))
}
setState(follower, Follower, followerData)

val leaderMemberIndex = createUniqueMemberIndex()
val installSnapshot = InstallSnapshot(
shardId,
Term(2),
leaderMemberIndex,
srcLatestSnapshotLastLogTerm = Term(1),
srcLatestSnapshotLastLogLogIndex = LogEntryIndex(8),
)
LoggingTestKit.error(s"ignored [${installSnapshot}]").expect {
follower ! installSnapshot
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -393,29 +393,27 @@ class RaftActorSpec
)

val leaderMemberIndex = createUniqueMemberIndex()
val term = Term.initial().next()
val entityId = NormalizedEntityId.from("test-entity")
val logEntries = Seq(
LogEntry(LogEntryIndex(1), EntityEvent(Option(entityId), "a"), term),
LogEntry(LogEntryIndex(2), EntityEvent(Option(entityId), "b"), term),
LogEntry(LogEntryIndex(3), EntityEvent(Option(entityId), "c"), term),
LogEntry(LogEntryIndex(4), EntityEvent(Option(entityId), "d"), term),
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Option(entityId), "b"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Option(entityId), "c"), Term(1)),
LogEntry(LogEntryIndex(4), EntityEvent(Option(entityId), "d"), Term(1)),
)
val applicableIndex = LogEntryIndex(3)
val installSnapshot =
InstallSnapshot(
shardId,
term,
Term(3),
leaderMemberIndex,
srcLatestSnapshotLastLogTerm = term,
srcLatestSnapshotLastLogLogIndex = applicableIndex,
srcLatestSnapshotLastLogTerm = Term(2),
srcLatestSnapshotLastLogLogIndex = LogEntryIndex(5),
)
follower ! createAppendEntries(
shardId,
term,
Term(1),
leaderMemberIndex,
entries = logEntries,
leaderCommit = applicableIndex,
leaderCommit = LogEntryIndex(3),
)

// wait for starting compaction
Expand All @@ -432,8 +430,7 @@ class RaftActorSpec
snapshotStore.reply(SaveSnapshotSuccess(msg.snapshot.metadata))
} should have length 1
// compaction completed (snapshot synchronization become available)
LoggingTestKit.info("Snapshot synchronization already completed").expect {
// the snapshots has been already saved by compaction
LoggingTestKit.info("Snapshot synchronization started").expect {
follower ! installSnapshot
}
}
Expand Down
Loading

0 comments on commit 10c8ee7

Please sign in to comment.