Skip to content

Commit

Permalink
Merge pull request #217 from lerna-stack/fix-old-snapshot-read-in-con…
Browse files Browse the repository at this point in the history
…secutive-snapshot-synchronization

Fix old snapshot read in consecutive snapshot synchronization
  • Loading branch information
negokaz authored Nov 6, 2023
2 parents 377d39a + 458ef31 commit 88d0bb3
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 43 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Improve snapshot synchronization diagnostic logging
[PR#216](https://github.com/lerna-stack/akka-entity-replication/pull/216)

### Fixed
- Consecutive snapshot synchronizations could not synchronize new entity snapshots
[#215](https://github.com/lerna-stack/akka-entity-replication/issues/215),
[PR#217](https://github.com/lerna-stack/akka-entity-replication/pull/217)

## [v2.3.0] - 2023-06-19
[v2.3.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.2.0...v2.3.0

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# It's safe to exclude the following since Stop is a private class.
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$Stop$")
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ private[entityreplication] object SnapshotSyncManager {

final case class SyncSnapshotFailed() extends Response

/** A `SnapshotSyncManager` will stop if it receives a `Stop` message unless its synchronization is in progress. */
private final case object Stop extends Command

sealed trait Event

final case class SnapshotCopied(
Expand Down Expand Up @@ -189,6 +186,20 @@ private[entityreplication] object SnapshotSyncManager {
)
}

/**
* Executes snapshot synchronization
*
* `SnapshotSyncManager` executes only one snapshot synchronization in its lifecycle.
* This design is related to the following aspects:
*
* - It reads snapshots from a [[lerna.akka.entityreplication.raft.snapshot.SnapshotStore]] that it spawns as its
* descendant. The `SnapshotStore` is a different instance from a `SnapshotStore` running as a descendant of
* [[lerna.akka.entityreplication.raft.RaftActor]] of the source replica group. The `SnapshotStore` cannot read new
* snapshots the other one wrote (by such as compaction) if it has already recovered from persisted data.
* - If it reuses an instance of `SnapshotStore` among multiple snapshot synchronizations, it cannot read a new
* snapshot written by a compaction that happens during the synchronization. It should spawn dedicated instances of
* `SnapshotStore` each synchronization or execute only one snapshot synchronization in its lifecycle.
*/
private[entityreplication] class SnapshotSyncManager(
typeName: TypeName,
srcMemberIndex: MemberIndex,
Expand Down Expand Up @@ -247,7 +258,14 @@ private[entityreplication] class SnapshotSyncManager(
}
this.state = snapshot

case event: Event => updateState(event)
case event: Event =>
event match {
case _: SnapshotCopied =>
updateState(event)
case _: SyncCompleted =>
updateState(event)
context.become(ready)
}

case RecoveryCompleted =>
if (log.isInfoEnabled) {
Expand All @@ -261,6 +279,11 @@ private[entityreplication] class SnapshotSyncManager(

override def receiveCommand: Receive = ready

/** Behavior in the ready state
*
* `SnapshotSyncManager` accepts a new snapshot synchronization request. After it accepts the request, it will
* transit to the synchronizing state for executing the synchronization.
*/
def ready: Receive = {

case SyncSnapshot(
Expand Down Expand Up @@ -313,14 +336,29 @@ private[entityreplication] class SnapshotSyncManager(
s"(typeName: $typeName, memberIndex: $dstMemberIndex, snapshotLastLogTerm: ${dstLatestSnapshotLastLogTerm.term}, snapshotLastLogIndex: $dstLatestSnapshotLastLogIndex)",
)

case message if handleAkkaPersistenceMessage.isDefinedAt(message) =>
handleAkkaPersistenceMessage(message)
case syncStatus: SyncStatus =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected SyncStatus: [{}]", syncStatus)
}

case Stop =>
stopSelf()
case failureStatus: Status.Failure =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected Status.Failure: [{}]", failureStatus)
}

case akkaPersistenceMessage if handleAkkaPersistenceMessage.isDefinedAt(akkaPersistenceMessage) =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected Akka Persistence message: [{}]", akkaPersistenceMessage)
}

}

/** Behavior in the synchronizing state
*
* `SnapshotSyncManager` synchronizes entity snapshots in the background. It will send the synchronization result to
* the reply-to actor after it completes the synchronization. It also will transit to the finalizing state if any
* finalizing (deleting events or snapshots) is needed.
*/
def synchronizing(
replyTo: ActorRef,
srcLatestSnapshotLastLogIndex: LogEntryIndex,
Expand Down Expand Up @@ -366,6 +404,7 @@ private[entityreplication] class SnapshotSyncManager(
// complete all
persist(SyncCompleted(event.offset)) { event =>
updateState(event)
context.become(finalizing)
saveSnapshot(this.state)
replyTo ! SyncSnapshotSucceeded(
completePartially.snapshotLastLogTerm,
Expand Down Expand Up @@ -412,24 +451,47 @@ private[entityreplication] class SnapshotSyncManager(
)
stopSelf()

case message if handleAkkaPersistenceMessage.isDefinedAt(message) =>
handleAkkaPersistenceMessage(message)
case akkaPersistenceMessage if handleAkkaPersistenceMessage.isDefinedAt(akkaPersistenceMessage) =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected Akka Persistence message: [{}]", akkaPersistenceMessage)
}

}

case Stop =>
/** Behavior in the finalizing state
*
* `SnapshotSyncManager` handles finalizing (saving snapshots, deleting events, and deleting snapshots) and stops
* eventually.
*/
private def finalizing: Receive = {

case syncSnapshot: SyncSnapshot =>
if (log.isDebugEnabled) {
log.debug("Dropping Stop since the new snapshot synchronization is in progress.")
log.debug("Dropping [{}] since the snapshot synchronization is finalizing.", syncSnapshot)
}

case syncStatus: SyncStatus =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected SyncStatus: [{}]", syncStatus)
}

case failureStatus: Status.Failure =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected Status.Failure: [{}]", failureStatus)
}

case akkaPersistenceMessage if handleAkkaPersistenceMessage.isDefinedAt(akkaPersistenceMessage) =>
// This actor will stop eventually after all deletions are completed.
handleAkkaPersistenceMessage(akkaPersistenceMessage)

}

def updateState(event: Event): Unit =
event match {
case event: SnapshotCopied =>
this.state = SyncProgress(event.offset)
// keep current behavior
case SyncCompleted(offset) =>
this.state = SyncProgress(offset)
context.become(ready)
}

private def stopSelf(): Unit = {
Expand Down Expand Up @@ -640,15 +702,24 @@ private[entityreplication] class SnapshotSyncManager(

/** Handles an Akka Persistence message
*
* This `SnapshotSyncManager` will attempt to stop after it handles a series of snapshot save, (optional) event
* deletion, and (optional) snapshot deletion. Attempt to stop is driven by sending and receiving a
* [[SnapshotSyncManager.Stop]] message.
* `SnapshotSyncManager` will stop after it handles a series of snapshot save, (optional) event deletion, and
* (optional) snapshot deletion.
*
* `SnapshotSyncManager` will delete events first, then snapshot. It's because:
*
* This `SnapshotSyncManager` will delete events first, then snapshot. It's because:
* There is a subtle timing at which the snapshot deletion completes before the event deletion completes. In this
* case, the manager doesn't log a message about completing the event deletion since it stops before receiving the
* completion message of the event deletion. To avoid such a case, the manager waits to complete the event deletion,
* then deletes the snapshot.
* If `SnapshotSyncManager` deletes events and snapshots in parallel order, there is a subtle timing at which the
* snapshot deletion completes before the event deletion completes, and vice-versa. Suppose `SnapshotSyncManager`
* stops immediately after the snapshot deletion completes, but the event deletion is not yet completed. In that
* case, `SnapshotSyncManager` cannot log a messages of the event deletion completion. For this reason,
* `SnapshotSyncManager` should delete messages and snapshots in serial order or stop after waiting for both deletion
* completions.
*
* While `SnapshotSyncManager` can choose the order of such deletions, deleting messages before snapshots enables
* `SnapshotSyncManager` to stop as soon as possible. Note that an upper sequence number of event deletion is less
* than or equal to one of snapshot deletion. If `SnapshotSyncManager` deletes no events, it also deletes no
* snapshots. In that case, `SnapshotSyncManager` can immediately stop after it deletes no events. Conversely, there
* is a chance to delete events even if the manager deletes no snapshots. In this case, `SnapshotSyncManager` cannot
* immediately stop.
*/
private def handleAkkaPersistenceMessage: Receive = {
case success: akka.persistence.SaveSnapshotSuccess =>
Expand All @@ -675,9 +746,9 @@ private[entityreplication] class SnapshotSyncManager(
} else if (shouldDeleteOldSnapshots) {
deleteOldSnapshots(success.metadata.sequenceNr - deletionRelativeSequenceNr)
} else {
// Attempt to stop itself if no deletions are enabled.
// If any deletion is enabled, this actor will attempt to stop after the deletion completes (succeeds or fails).
self ! Stop
// Stop itself if no deletions are enabled.
// If any deletion is enabled, this actor will stop after the deletion completes (succeeds or fails).
stopSelf()
}
}

Expand All @@ -690,7 +761,7 @@ private[entityreplication] class SnapshotSyncManager(
failure.cause.getMessage,
)
}
self ! Stop
stopSelf()
}

private def handleDeleteMessagesSuccess(success: akka.persistence.DeleteMessagesSuccess): Unit = {
Expand All @@ -700,9 +771,9 @@ private[entityreplication] class SnapshotSyncManager(
if (shouldDeleteOldSnapshots) {
// Subtraction of `deletionRelativeSequenceNr` is not needed, since it's already subtracted at the event deletion.
deleteOldSnapshots(success.toSequenceNr)
// This actor will attempt to stop after the snapshot deletion completes (succeeds or fails).
// This actor will stop after the snapshot deletion completes (succeeds or fails).
} else {
self ! Stop
stopSelf()
}
}

Expand All @@ -715,14 +786,14 @@ private[entityreplication] class SnapshotSyncManager(
failure.cause.getMessage,
)
}
self ! Stop
stopSelf()
}

private def handleDeleteSnapshotsSuccess(success: akka.persistence.DeleteSnapshotsSuccess): Unit = {
if (log.isInfoEnabled) {
log.info("Succeeded to deleteSnapshots given criteria [{}]", success.criteria)
}
self ! Stop
stopSelf()
}

private def handleDeleteSnapshotsFailure(failure: akka.persistence.DeleteSnapshotsFailure): Unit = {
Expand All @@ -734,7 +805,7 @@ private[entityreplication] class SnapshotSyncManager(
failure.cause.getMessage,
)
}
self ! Stop
stopSelf()
}

private def deleteOldEvents(upperSequenceNr: Long): Unit = {
Expand All @@ -746,9 +817,9 @@ private[entityreplication] class SnapshotSyncManager(
}
deleteMessages(deleteEventsToSequenceNr)
} else {
// Attempt to stop itself if it deletes no events.
// Stop itself if it deletes no events.
// No event deletion means that it will also delete no snapshots.
self ! Stop
stopSelf()
}
}

Expand All @@ -765,8 +836,8 @@ private[entityreplication] class SnapshotSyncManager(
}
deleteSnapshots(deletionCriteria)
} else {
// Attempt to stop itself if it deletes no snapshots.
self ! Stop
// Stop itself if it deletes no snapshots.
stopSelf()
}
}

Expand Down
Loading

0 comments on commit 88d0bb3

Please sign in to comment.