Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persit EntitySnapshot as an event #184

Merged
merged 17 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add function of Disabling raft actor [PR#173](https://github.com/lerna-stack/akka-entity-replication/pull/173)
- Add diagnostic info to logs of sending replication results
[PR#179](https://github.com/lerna-stack/akka-entity-replication/pull/179)
- Persist EntitySnapshot as an event
[PR#184](https://github.com/lerna-stack/akka-entity-replication/pull/184)

### Fixed
- RaftActor might delete committed entries
Expand Down
10 changes: 10 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ lerna.akka.entityreplication {
max-snapshot-batch-size = 1000
}

// Entity snapshot store settings
// All entity snapshot histries are persisted for data recovery.
entity-snapshot-store {

// Frequency of taking snapshot of entity snapshot.
// By taking snapshot, optimizing recovery times of an Entity.
// Increasing this value increases snapshot writing, decreasing it increases recovery times.
snapshot-every = 1
}
negokaz marked this conversation as resolved.
Show resolved Hide resolved

sharding = ${akka.cluster.sharding} {
// Maximum number of messages that are buffered by a ShardRegion actor.
// Make it smaller than the default value to discard messages that are too old.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ trait RaftSettings {

def snapshotSyncMaxSnapshotBatchSize: Int

def snapshotStoreSnapshotEvery: Int

def clusterShardingConfig: Config

def raftActorAutoStartFrequency: FiniteDuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private[entityreplication] final case class RaftSettingsImpl(
snapshotSyncCopyingParallelism: Int,
snapshotSyncPersistenceOperationTimeout: FiniteDuration,
snapshotSyncMaxSnapshotBatchSize: Int,
snapshotStoreSnapshotEvery: Int,
clusterShardingConfig: Config,
raftActorAutoStartFrequency: FiniteDuration,
raftActorAutoStartNumberOfActors: Int,
Expand Down Expand Up @@ -156,6 +157,12 @@ private[entityreplication] object RaftSettingsImpl {
s"snapshot-sync.max-snapshot-batch-size (${snapshotSyncMaxSnapshotBatchSize}) should be larger than 0",
)

val snapshotStoreSnapshotEvery: Int = config.getInt("entity-snapshot-store.snapshot-every")
require(
snapshotStoreSnapshotEvery > 0,
s"entity-snapshot-store.snapshot-every ($snapshotStoreSnapshotEvery) should be larger than 0",
)

val clusterShardingConfig: Config = config.getConfig("sharding")

val raftActorAutoStartFrequency: FiniteDuration =
Expand Down Expand Up @@ -237,6 +244,7 @@ private[entityreplication] object RaftSettingsImpl {
snapshotSyncCopyingParallelism = snapshotSyncCopyingParallelism,
snapshotSyncPersistenceOperationTimeout = snapshotSyncPersistenceOperationTimeout,
snapshotSyncMaxSnapshotBatchSize = snapshotSyncMaxSnapshotBatchSize,
snapshotStoreSnapshotEvery = snapshotStoreSnapshotEvery,
clusterShardingConfig = clusterShardingConfig,
raftActorAutoStartFrequency = raftActorAutoStartFrequency,
raftActorAutoStartNumberOfActors = raftActorAutoStartNumberOfActors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package lerna.akka.entityreplication.raft.snapshot

import akka.actor.{ ActorLogging, ActorRef, Props, ReceiveTimeout }
import akka.persistence
import akka.persistence.{ PersistentActor, Recovery, SnapshotSelectionCriteria }
import akka.persistence.PersistentActor
import lerna.akka.entityreplication.model.{ NormalizedEntityId, TypeName }
import lerna.akka.entityreplication.raft.RaftSettings
import lerna.akka.entityreplication.raft.routing.MemberIndex
Expand Down Expand Up @@ -33,6 +33,8 @@ private[entityreplication] class SnapshotStore(
with ActorLogging {
import SnapshotProtocol._

private var replyRefCache: Option[ActorRef] = None

override def persistenceId: String =
SnapshotStore.persistenceId(typeName, entityId, selfMemberIndex)

Expand All @@ -42,20 +44,30 @@ private[entityreplication] class SnapshotStore(

context.setReceiveTimeout(settings.compactionSnapshotCacheTimeToLive)

// SequenceNr is always 0 because SnapshotStore doesn't persist events (only persist snapshots).
override def recovery: Recovery =
Recovery(
toSequenceNr = 0L,
fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 0L, maxTimestamp = Long.MaxValue),
)

override def receiveRecover: Receive = {
case snapshot: EntitySnapshot => context.become(hasSnapshot(snapshot))
case akka.persistence.SnapshotOffer(_, s: EntitySnapshot) =>
context.become(hasSnapshot(s))
}

override def receiveCommand: Receive = hasNoSnapshot

override protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
if (log.isWarningEnabled) {
log.warning(
"Saving snapshot failed - {}: {}",
cause.getClass.getCanonicalName,
cause.getMessage,
)
}
super.onPersistFailure(cause, event, seqNr)
replyRefCache.fold(
if (log.isWarningEnabled) log.warning("missing reply reference - {}", cause.getClass.getCanonicalName),
)(
_ ! SaveSnapshotFailure(event.asInstanceOf[EntitySnapshot].metadata),
)
}

def hasNoSnapshot: Receive = {
case command: Command =>
command match {
Expand All @@ -81,12 +93,19 @@ private[entityreplication] class SnapshotStore(
// reduce IO: don't save if same as cached snapshot
command.replyTo ! SaveSnapshotSuccess(command.snapshot.metadata)
} else {
saveSnapshot(command.snapshot)
context.become(savingSnapshot(command.replyTo, command.snapshot, prevSnapshot))
replyRefCache = Option(command.replyTo)
persistAsync(command.snapshot) { _ =>
command.replyTo ! SaveSnapshotSuccess(command.snapshot.metadata)
if (lastSequenceNr % settings.snapshotStoreSnapshotEvery == 0 && lastSequenceNr != 0) {
saveSnapshot(command.snapshot)
}
context.become(hasSnapshot(command.snapshot))
}
context.become(savingSnapshot(prevSnapshot))
}
}

def savingSnapshot(replyTo: ActorRef, snapshot: EntitySnapshot, prevSnapshot: Option[EntitySnapshot]): Receive = {
def savingSnapshot(prevSnapshot: Option[EntitySnapshot]): Receive = {
case command: Command =>
command match {
case cmd: SaveSnapshot =>
Expand All @@ -103,23 +122,24 @@ private[entityreplication] class SnapshotStore(
replyTo ! SnapshotProtocol.SnapshotNotFound(entityId)
}
}
case _: persistence.SaveSnapshotSuccess =>
replyTo ! SaveSnapshotSuccess(snapshot.metadata)
context.become(hasSnapshot(snapshot))
case failure: persistence.SaveSnapshotFailure =>
if (log.isWarningEnabled)
log.warning(
"Saving snapshot failed - {}: {}",
failure.cause.getClass.getCanonicalName,
failure.cause.getMessage,
)
replyTo ! SaveSnapshotFailure(snapshot.metadata)
}

override def unhandled(message: Any): Unit =
message match {
case ReceiveTimeout =>
context.stop(self)
case _: persistence.SaveSnapshotSuccess =>
if (log.isDebugEnabled) {
log.debug("Saving EntitySnapshot as a snapshot succeeded.")
}
case failure: persistence.SaveSnapshotFailure =>
if (log.isWarningEnabled) {
log.warning(
"Saving EntitySnapshot as a snapshot failed. - {}: {}",
failure.cause.getClass.getCanonicalName,
failure.cause.getMessage,
)
}
case _ =>
super.unhandled(message)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ final class RaftSettingsSpec extends TestKit(ActorSystem("RaftSettingsSpec")) wi
settings.snapshotSyncCopyingParallelism shouldBe 10
settings.snapshotSyncPersistenceOperationTimeout shouldBe 10.seconds
settings.snapshotSyncMaxSnapshotBatchSize shouldBe 1_000
settings.snapshotStoreSnapshotEvery shouldBe 1
settings.clusterShardingConfig shouldBe defaultConfig.getConfig("lerna.akka.entityreplication.raft.sharding")
settings.raftActorAutoStartFrequency shouldBe 3.seconds
settings.raftActorAutoStartNumberOfActors shouldBe 5
Expand Down Expand Up @@ -179,6 +180,29 @@ final class RaftSettingsSpec extends TestKit(ActorSystem("RaftSettingsSpec")) wi
}
}

"throw an IllegalArgumentException if the given entity-snapshot-store.snapshot-every is less than or equal to 0" in {
{
val config = ConfigFactory
.parseString("""
|lerna.akka.entityreplication.raft.entity-snapshot-store.snapshot-every = -1
|""".stripMargin)
.withFallback(defaultConfig)
a[IllegalArgumentException] shouldBe thrownBy {
RaftSettings(config)
}
}
{
val config = ConfigFactory
.parseString("""
|lerna.akka.entityreplication.raft.entity-snapshot-store.snapshot-every = 0
|""".stripMargin)
.withFallback(defaultConfig)
a[IllegalArgumentException] shouldBe thrownBy {
RaftSettings(config)
}
}
}

"throw an IllegalArgumentException if the given raft-actor-auto-start.frequency is 0 milli" in {
val config = ConfigFactory
.parseString("""
Expand Down
Loading