Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix old snapshot read in consecutive snapshot synchronization #217

Merged
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Improve snapshot synchronization diagnostic logging
[PR#216](https://github.com/lerna-stack/akka-entity-replication/pull/216)

### Fixed
- Consecutive snapshot synchronizations could not synchronize new entity snapshots
[#215](https://github.com/lerna-stack/akka-entity-replication/issues/215),
[PR#217](https://github.com/lerna-stack/akka-entity-replication/pull/217)

## [v2.3.0] - 2023-06-19
[v2.3.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.2.0...v2.3.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ private[entityreplication] object SnapshotSyncManager {

final case class SyncSnapshotFailed() extends Response

/** A `SnapshotSyncManager` will stop if it receives a `Stop` message unless its synchronization is in progress. */
private final case object Stop extends Command
xirc marked this conversation as resolved.
Show resolved Hide resolved

sealed trait Event

final case class SnapshotCopied(
Expand Down Expand Up @@ -189,6 +186,20 @@ private[entityreplication] object SnapshotSyncManager {
)
}

/**
* Executes snapshot synchronization
*
* `SnapshotSyncManager` executes only one snapshot synchronization in its lifecycle.
* This design is related to the following aspects:
*
* - It reads snapshots from a [[lerna.akka.entityreplication.raft.snapshot.SnapshotStore]] that it spawns as its
* descendant. The `SnapshotStore` is a different instance from a `SnapshotStore` running as a descendant of
* [[lerna.akka.entityreplication.raft.RaftActor]] of the source replica group. The `SnapshotStore` cannot read new
* snapshots the other one wrote (by such as compaction) if it has already recovered from persisted data.
* - If it reuses an instance of `SnapshotStore` among multiple snapshot synchronizations, it cannot read a new
* snapshot written by a compaction that happens during the synchronization. It should spawn dedicated instances of
* `SnapshotStore` each synchronization or execute only one snapshot synchronization in its lifecycle.
*/
private[entityreplication] class SnapshotSyncManager(
typeName: TypeName,
srcMemberIndex: MemberIndex,
Expand Down Expand Up @@ -247,7 +258,14 @@ private[entityreplication] class SnapshotSyncManager(
}
this.state = snapshot

case event: Event => updateState(event)
case event: Event =>
event match {
case _: SnapshotCopied =>
updateState(event)
case _: SyncCompleted =>
updateState(event)
context.become(ready)
}

case RecoveryCompleted =>
if (log.isInfoEnabled) {
Expand All @@ -261,6 +279,11 @@ private[entityreplication] class SnapshotSyncManager(

override def receiveCommand: Receive = ready

/** Behavior in the ready state
*
* `SnapshotSyncManager` accepts a new snapshot synchronization request. After it accepts the request, it will
* transit to the synchronizing state for executing the synchronization.
*/
def ready: Receive = {

case SyncSnapshot(
Expand Down Expand Up @@ -313,14 +336,29 @@ private[entityreplication] class SnapshotSyncManager(
s"(typeName: $typeName, memberIndex: $dstMemberIndex, snapshotLastLogTerm: ${dstLatestSnapshotLastLogTerm.term}, snapshotLastLogIndex: $dstLatestSnapshotLastLogIndex)",
)

case message if handleAkkaPersistenceMessage.isDefinedAt(message) =>
handleAkkaPersistenceMessage(message)
case syncStatus: SyncStatus =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected SyncStatus: [{}]", syncStatus)
}

case Stop =>
stopSelf()
case failureStatus: Status.Failure =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected Status.Failure: [{}]", failureStatus)
}

case akkaPersistenceMessage if handleAkkaPersistenceMessage.isDefinedAt(akkaPersistenceMessage) =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected Akka Persistence message: [{}]", akkaPersistenceMessage)
}

}

/** Behavior in the synchronizing state
*
* `SnapshotSyncManager` synchronizes entity snapshots in the background. It will send the synchronization result to
* the reply-to actor after it completes the synchronization. It also will transit to the finalizing state if any
* finalizing (deleting events or snapshots) is needed.
*/
def synchronizing(
replyTo: ActorRef,
srcLatestSnapshotLastLogIndex: LogEntryIndex,
Expand Down Expand Up @@ -366,6 +404,7 @@ private[entityreplication] class SnapshotSyncManager(
// complete all
persist(SyncCompleted(event.offset)) { event =>
updateState(event)
context.become(finalizing)
saveSnapshot(this.state)
replyTo ! SyncSnapshotSucceeded(
completePartially.snapshotLastLogTerm,
Expand Down Expand Up @@ -412,24 +451,47 @@ private[entityreplication] class SnapshotSyncManager(
)
stopSelf()

case message if handleAkkaPersistenceMessage.isDefinedAt(message) =>
handleAkkaPersistenceMessage(message)
case akkaPersistenceMessage if handleAkkaPersistenceMessage.isDefinedAt(akkaPersistenceMessage) =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected Akka Persistence message: [{}]", akkaPersistenceMessage)
}

}

case Stop =>
/** Behavior in the finalizing state
*
* `SnapshotSyncManager` handles finalizing (saving snapshots, deleting events, and deleting snapshots) and stops
* eventually.
*/
private def finalizing: Receive = {

case syncSnapshot: SyncSnapshot =>
if (log.isDebugEnabled) {
log.debug("Dropping Stop since the new snapshot synchronization is in progress.")
log.debug("Dropping [{}] since the snapshot synchronization is finalizing.", syncSnapshot)
}

case syncStatus: SyncStatus =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected SyncStatus: [{}]", syncStatus)
}

case failureStatus: Status.Failure =>
if (log.isWarningEnabled) {
log.warning("Dropping unexpected Status.Failure: [{}]", failureStatus)
}

case akkaPersistenceMessage if handleAkkaPersistenceMessage.isDefinedAt(akkaPersistenceMessage) =>
// This actor will stop eventually after all deletions are completed.
handleAkkaPersistenceMessage(akkaPersistenceMessage)

}

def updateState(event: Event): Unit =
event match {
case event: SnapshotCopied =>
this.state = SyncProgress(event.offset)
// keep current behavior
case SyncCompleted(offset) =>
this.state = SyncProgress(offset)
context.become(ready)
}

private def stopSelf(): Unit = {
Expand Down Expand Up @@ -640,15 +702,24 @@ private[entityreplication] class SnapshotSyncManager(

/** Handles an Akka Persistence message
*
* This `SnapshotSyncManager` will attempt to stop after it handles a series of snapshot save, (optional) event
* deletion, and (optional) snapshot deletion. Attempt to stop is driven by sending and receiving a
* [[SnapshotSyncManager.Stop]] message.
* `SnapshotSyncManager` will stop after it handles a series of snapshot save, (optional) event deletion, and
* (optional) snapshot deletion.
*
* `SnapshotSyncManager` will delete events first, then snapshot. It's because:
*
* This `SnapshotSyncManager` will delete events first, then snapshot. It's because:
* There is a subtle timing at which the snapshot deletion completes before the event deletion completes. In this
* case, the manager doesn't log a message about completing the event deletion since it stops before receiving the
* completion message of the event deletion. To avoid such a case, the manager waits to complete the event deletion,
* then deletes the snapshot.
* If `SnapshotSyncManager` deletes events and snapshots in parallel order, there is a subtle timing at which the
* snapshot deletion completes before the event deletion completes, and vice-versa. Suppose `SnapshotSyncManager`
* stops immediately after the snapshot deletion completes, but the event deletion is not yet completed. In that
* case, `SnapshotSyncManager` cannot log a messages of the event deletion completion. For this reason,
* `SnapshotSyncManager` should delete messages and snapshots in serial order or stop after waiting for both deletion
* completions.
*
* While `SnapshotSyncManager` can choose the order of such deletions, deleting messages before snapshots enables
* `SnapshotSyncManager` to stop as soon as possible. Note that an upper sequence number of event deletion is less
* than or equal to one of snapshot deletion. If `SnapshotSyncManager` deletes no events, it also deletes no
* snapshots. In that case, `SnapshotSyncManager` can immediately stop after it deletes no events. Conversely, there
* is a chance to delete events even if the manager deletes no snapshots. In this case, `SnapshotSyncManager` cannot
* immediately stop.
*/
private def handleAkkaPersistenceMessage: Receive = {
case success: akka.persistence.SaveSnapshotSuccess =>
Expand All @@ -675,9 +746,9 @@ private[entityreplication] class SnapshotSyncManager(
} else if (shouldDeleteOldSnapshots) {
deleteOldSnapshots(success.metadata.sequenceNr - deletionRelativeSequenceNr)
} else {
// Attempt to stop itself if no deletions are enabled.
// If any deletion is enabled, this actor will attempt to stop after the deletion completes (succeeds or fails).
self ! Stop
// Stop itself if no deletions are enabled.
// If any deletion is enabled, this actor will stop after the deletion completes (succeeds or fails).
stopSelf()
}
}

Expand All @@ -690,7 +761,7 @@ private[entityreplication] class SnapshotSyncManager(
failure.cause.getMessage,
)
}
self ! Stop
stopSelf()
}

private def handleDeleteMessagesSuccess(success: akka.persistence.DeleteMessagesSuccess): Unit = {
Expand All @@ -700,9 +771,9 @@ private[entityreplication] class SnapshotSyncManager(
if (shouldDeleteOldSnapshots) {
// Subtraction of `deletionRelativeSequenceNr` is not needed, since it's already subtracted at the event deletion.
deleteOldSnapshots(success.toSequenceNr)
// This actor will attempt to stop after the snapshot deletion completes (succeeds or fails).
// This actor will stop after the snapshot deletion completes (succeeds or fails).
} else {
self ! Stop
stopSelf()
}
}

Expand All @@ -715,14 +786,14 @@ private[entityreplication] class SnapshotSyncManager(
failure.cause.getMessage,
)
}
self ! Stop
stopSelf()
}

private def handleDeleteSnapshotsSuccess(success: akka.persistence.DeleteSnapshotsSuccess): Unit = {
if (log.isInfoEnabled) {
log.info("Succeeded to deleteSnapshots given criteria [{}]", success.criteria)
}
self ! Stop
stopSelf()
}

private def handleDeleteSnapshotsFailure(failure: akka.persistence.DeleteSnapshotsFailure): Unit = {
Expand All @@ -734,7 +805,7 @@ private[entityreplication] class SnapshotSyncManager(
failure.cause.getMessage,
)
}
self ! Stop
stopSelf()
}

private def deleteOldEvents(upperSequenceNr: Long): Unit = {
Expand All @@ -746,9 +817,9 @@ private[entityreplication] class SnapshotSyncManager(
}
deleteMessages(deleteEventsToSequenceNr)
} else {
// Attempt to stop itself if it deletes no events.
// Stop itself if it deletes no events.
// No event deletion means that it will also delete no snapshots.
self ! Stop
stopSelf()
}
}

Expand All @@ -765,8 +836,8 @@ private[entityreplication] class SnapshotSyncManager(
}
deleteSnapshots(deletionCriteria)
} else {
// Attempt to stop itself if it deletes no snapshots.
self ! Stop
// Stop itself if it deletes no snapshots.
stopSelf()
}
}

Expand Down